From 774aaee6dd43928d6433451f38becb31464a415f Mon Sep 17 00:00:00 2001 From: samland1116 Date: Wed, 10 Dec 2025 14:05:26 -0600 Subject: [PATCH 1/5] adds warning to clone from s3 --- src/teehr/loading/s3/clone_from_s3.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/teehr/loading/s3/clone_from_s3.py b/src/teehr/loading/s3/clone_from_s3.py index 61772119..ce4df3f0 100644 --- a/src/teehr/loading/s3/clone_from_s3.py +++ b/src/teehr/loading/s3/clone_from_s3.py @@ -49,6 +49,14 @@ def subset_the_table( """Subset the dataset based on location and start/end time.""" if table.name == "locations" and primary_location_ids is not None: sdf_in = sdf_in.filter(sdf_in.id.isin(primary_location_ids)) + # warn user if any primary_location_ids were excluded + available_ids = set(sdf_in.select("id").rdd.flatMap(lambda x: x).collect()) + missing_ids = set(primary_location_ids) - available_ids + if len(missing_ids) > 0: + logger.warning( + f"The following primary_location_ids were not found in the \ + locations table and will be excluded: {missing_ids}" + ) elif table.name == "location_attributes" and primary_location_ids is not None: sdf_in = sdf_in.filter(sdf_in.location_id.isin(primary_location_ids)) elif table.name == "location_crosswalks" and primary_location_ids is not None: From 3027419972f9d2430b1ebea2bd0f383c60348473 Mon Sep 17 00:00:00 2001 From: samland1116 Date: Thu, 11 Dec 2025 12:11:36 -0600 Subject: [PATCH 2/5] adds warning for empty TS --- src/teehr/loading/s3/clone_from_s3.py | 38 ++++++++++++++++++++------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/src/teehr/loading/s3/clone_from_s3.py b/src/teehr/loading/s3/clone_from_s3.py index ce4df3f0..413af261 100644 --- a/src/teehr/loading/s3/clone_from_s3.py +++ b/src/teehr/loading/s3/clone_from_s3.py @@ -50,16 +50,21 @@ def subset_the_table( if table.name == "locations" and primary_location_ids is not None: sdf_in = sdf_in.filter(sdf_in.id.isin(primary_location_ids)) # warn user if any primary_location_ids were excluded - available_ids = set(sdf_in.select("id").rdd.flatMap(lambda x: x).collect()) + available_ids = set( + sdf_in.select("id").rdd.flatMap( + lambda x: x).collect() + ) missing_ids = set(primary_location_ids) - available_ids if len(missing_ids) > 0: logger.warning( - f"The following primary_location_ids were not found in the \ - locations table and will be excluded: {missing_ids}" + "The following primary_location_ids were not found in the " + f"locations table and will be excluded: {missing_ids}" ) - elif table.name == "location_attributes" and primary_location_ids is not None: + elif table.name == "location_attributes" and \ + primary_location_ids is not None: sdf_in = sdf_in.filter(sdf_in.location_id.isin(primary_location_ids)) - elif table.name == "location_crosswalks" and primary_location_ids is not None: + elif table.name == "location_crosswalks" and \ + primary_location_ids is not None: sdf_in = sdf_in.filter( sdf_in.primary_location_id.isin(primary_location_ids) ) @@ -68,11 +73,23 @@ def subset_the_table( sdf_in = sdf_in.filter( sdf_in.location_id.isin(primary_location_ids) ) + # warn user if no timeseries data exists for a gage + available_ids = set( + sdf_in.select("location_id").rdd.flatMap( + lambda x: x).collect() + ) + missing_ids = set(primary_location_ids) - available_ids + if len(missing_ids) > 0: + logger.warning( + f"The following primary_location_ids have no data in the " + f"primary_timeseries table: {missing_ids}" + ) elif table.name == "secondary_timeseries": if primary_location_ids is not None: secondary_ids = ( ev.location_crosswalks.to_sdf() - .select("secondary_location_id").rdd.flatMap(lambda x: x).collect() + .select("secondary_location_id").rdd.flatMap( + lambda x: x).collect() ) sdf_in = sdf_in.filter(sdf_in.location_id.isin(secondary_ids)) elif table.name == "joined_timeseries": @@ -127,7 +144,6 @@ def clone_from_s3( Note: future version will allow subsetting the tables to clone. """ - # Make the Evaluation directories logger.info(f"Creating directories for evaluation: {evaluation_name}") Path(ev.cache_dir).mkdir() @@ -182,7 +198,10 @@ def clone_from_s3( logger.debug(f"Making directory {table.dir}") Path(table.dir).mkdir() - logger.debug(f"Cloning {table.name} from {s3_dataset_path}/{table.name}/ to {table.dir}") + logger.debug( + f"Cloning {table.name} from {s3_dataset_path}/{table.name}/ to " + f"{table.dir}" + ) sdf_in = table._read_files( path=f"{s3_dataset_path}/{table.name}/", @@ -208,7 +227,8 @@ def clone_from_s3( dest = f"{ev.scripts_dir}/user_defined_fields.py" logger.debug(f"Copying from {source}/ to {dest}") - # ToDo: there is a permission issue that prevents copying the entire directory. + # ToDo: there is a permission issue that prevents copying the entire + # directory. # This works for now. with fsspec.open(source, 'r', anon=True) as file: with open(dest, 'w') as f: From 3c917548cfaa135266d7d30c1014d508b7eec10b Mon Sep 17 00:00:00 2001 From: samland1116 Date: Fri, 12 Dec 2025 13:47:23 -0600 Subject: [PATCH 3/5] updates tests --- tests/fetch/test_nwm_fetch_and_format.py | 15 ++++++++++----- tests/query/test_get_metrics_query.py | 14 ++++++++------ 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/tests/fetch/test_nwm_fetch_and_format.py b/tests/fetch/test_nwm_fetch_and_format.py index f27d7fdc..351af2f9 100644 --- a/tests/fetch/test_nwm_fetch_and_format.py +++ b/tests/fetch/test_nwm_fetch_and_format.py @@ -49,7 +49,8 @@ def test_nwm22_point_fetch_and_format(tmpdir): overwrite_output=True, nwm_version="nwm22", variable_mapper=TEST_NWM_VARIABLE_MAPPER, - timeseries_type="secondary" + timeseries_type="secondary", + drop_overlapping_assimilation_values=False ) parquet_file = Path(tmpdir, "20230318T14.parquet") @@ -90,7 +91,8 @@ def test_nwm30_point_fetch_and_format(tmpdir): ignore_missing_file=False, overwrite_output=True, variable_mapper=TEST_NWM_VARIABLE_MAPPER, - timeseries_type="secondary" + timeseries_type="secondary", + drop_overlapping_assimilation_values=False ) parquet_file = Path(tmpdir, "20231101T00.parquet") @@ -130,7 +132,8 @@ def test_nwm30_point_fetch_and_format_medium_range_member(tmpdir): ignore_missing_file=False, overwrite_output=True, variable_mapper=TEST_NWM_VARIABLE_MAPPER, - timeseries_type="secondary" + timeseries_type="secondary", + drop_overlapping_assimilation_values=False ) parquet_file = Path(tmpdir, "20240222T00.parquet") @@ -163,7 +166,8 @@ def test_nwm22_grid_fetch_and_format(tmpdir): overwrite_output=True, location_id_prefix=None, variable_mapper=None, - timeseries_type="primary" + timeseries_type="primary", + drop_overlapping_assimilation_values=False ) parquet_file = Path(tmpdir, "20201218T00.parquet") @@ -205,7 +209,8 @@ def test_nwm30_grid_fetch_and_format(tmpdir): overwrite_output=True, location_id_prefix=None, variable_mapper=TEST_NWM_VARIABLE_MAPPER, - timeseries_type="primary" + timeseries_type="primary", + drop_overlapping_assimilation_values=False ) parquet_file = Path(tmpdir, "20231101T00.parquet") diff --git a/tests/query/test_get_metrics_query.py b/tests/query/test_get_metrics_query.py index ac86fc89..80d6b26a 100644 --- a/tests/query/test_get_metrics_query.py +++ b/tests/query/test_get_metrics_query.py @@ -490,18 +490,20 @@ def test_ensemble_metrics(tmpdir): ).to_pandas() # check CRPS values - assert np.isclose(metrics_df.mean_crps_ensemble.values[0], 22.050798) - assert np.isclose(metrics_df.mean_crps_ensemble.values[1], 22.383705) assert np.isclose( - metrics_df.mean_crps_ensemble_skill_score.values[0], 0.20777595 + metrics_df.mean_crps_ensemble.values[0], 22.0508, atol=1e-04 + ) + assert np.isclose( + metrics_df.mean_crps_ensemble_skill_score.values[0], 0.2078, atol=1e-04 ) assert np.isnan(metrics_df.mean_crps_ensemble_skill_score.values[2]) # check Brier Score values - assert np.isclose(metrics_df.mean_brier_score.values[0], 0.18979715) - assert np.isclose(metrics_df.mean_brier_score.values[1], 0.19405437) assert np.isclose( - metrics_df.mean_brier_score_skill_score.values[0], 0.26453602 + metrics_df.mean_brier_score.values[0], 0.1898, atol=1e-04 + ) + assert np.isclose( + metrics_df.mean_brier_score_skill_score.values[0], 0.2645, atol=1e-04 ) assert np.isnan( metrics_df.mean_brier_score_skill_score.values[2] From 558dfc0c781453159a4b54914844dbdd60dad857 Mon Sep 17 00:00:00 2001 From: samland1116 Date: Fri, 12 Dec 2025 15:42:07 -0600 Subject: [PATCH 4/5] updates tests again --- tests/query/test_get_metrics_query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/query/test_get_metrics_query.py b/tests/query/test_get_metrics_query.py index 80d6b26a..95135928 100644 --- a/tests/query/test_get_metrics_query.py +++ b/tests/query/test_get_metrics_query.py @@ -491,7 +491,7 @@ def test_ensemble_metrics(tmpdir): # check CRPS values assert np.isclose( - metrics_df.mean_crps_ensemble.values[0], 22.0508, atol=1e-04 + metrics_df.mean_crps_ensemble.values[1], 22.3837, atol=1e-04 ) assert np.isclose( metrics_df.mean_crps_ensemble_skill_score.values[0], 0.2078, atol=1e-04 From bf15f64edc5540c6b43bf16f3422c958a5e36de9 Mon Sep 17 00:00:00 2001 From: samland1116 Date: Wed, 7 Jan 2026 10:39:16 -0600 Subject: [PATCH 5/5] remove redundant warning --- src/teehr/loading/s3/clone_from_s3.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/teehr/loading/s3/clone_from_s3.py b/src/teehr/loading/s3/clone_from_s3.py index 413af261..07a34acc 100644 --- a/src/teehr/loading/s3/clone_from_s3.py +++ b/src/teehr/loading/s3/clone_from_s3.py @@ -73,17 +73,6 @@ def subset_the_table( sdf_in = sdf_in.filter( sdf_in.location_id.isin(primary_location_ids) ) - # warn user if no timeseries data exists for a gage - available_ids = set( - sdf_in.select("location_id").rdd.flatMap( - lambda x: x).collect() - ) - missing_ids = set(primary_location_ids) - available_ids - if len(missing_ids) > 0: - logger.warning( - f"The following primary_location_ids have no data in the " - f"primary_timeseries table: {missing_ids}" - ) elif table.name == "secondary_timeseries": if primary_location_ids is not None: secondary_ids = (