From b6c80d9b3ec13e82bf1443707cd043446c8fdd87 Mon Sep 17 00:00:00 2001 From: Novotny <1533907136121002@mil> Date: Tue, 7 Oct 2025 15:10:26 -0500 Subject: [PATCH 01/14] fix message from post --- cwms/api.py | 40 ++++++++----- cwms/timeseries/timeseries.py | 23 +++---- tests/cda/timeseries/timeseries_CDA_test.py | 66 ++++++++------------- 3 files changed, 61 insertions(+), 68 deletions(-) diff --git a/cwms/api.py b/cwms/api.py index 022f9e6e..8cc34691 100644 --- a/cwms/api.py +++ b/cwms/api.py @@ -305,6 +305,25 @@ def get_with_paging( return response +def _post_function( + endpoint: str, + data: Any, + params: Optional[RequestParams] = None, + *, + api_version: int = API_VERSION, +) -> Any: + + # post requires different headers than get for + headers = {"accept": "*/*", "Content-Type": api_version_text(api_version)} + if isinstance(data, dict) or isinstance(data, list): + data = json.dumps(data) + with SESSION.post(endpoint, params=params, headers=headers, data=data) as response: + if not response.ok: + logging.error(f"CDA Error: response={response}") + raise ApiError(response) + return response + + def post( endpoint: str, data: Any, @@ -329,11 +348,7 @@ def post( Raises: ApiError: If an error response is return by the API. """ - - post_with_returned_data( - endpoint=endpoint, data=data, params=params, api_version=api_version - ) - return None + _post_function(endpoint=endpoint, data=data, params=params, api_version=api_version) def post_with_returned_data( @@ -361,17 +376,10 @@ def post_with_returned_data( ApiError: If an error response is return by the API. """ - # post requires different headers than get for - headers = {"accept": "*/*", "Content-Type": api_version_text(api_version)} - - if isinstance(data, dict) or isinstance(data, list): - data = json.dumps(data) - - with SESSION.post(endpoint, params=params, headers=headers, data=data) as response: - if not response.ok: - logging.error(f"CDA Error: response={response}") - raise ApiError(response) - return _process_response(response) + response = _post_function( + endpoint=endpoint, data=data, params=params, api_version=api_version + ) + return _process_response(response) def patch( diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 4d69a447..9cdfc158 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -1,4 +1,5 @@ import concurrent.futures +import logging from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional, Tuple @@ -82,7 +83,7 @@ def get_ts_ids(ts_id: str) -> Any: } return result_dict except Exception as e: - print(f"Error processing {ts_id}: {e}") + logging.error(f"Error processing {ts_id}: {e}") return None with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: @@ -186,7 +187,7 @@ def fetch_timeseries_chunks( except Exception as e: # Log or handle any errors that occur during execution chunk_start, chunk_end = future_to_chunk[future] - print( + logging.error( f"ERROR: Failed to fetch data from {chunk_start} to {chunk_end}: {e}" ) return results @@ -371,13 +372,13 @@ def get_timeseries( # replace begin with begin extent if outside extents if begin < begin_extent: begin = begin_extent - print( - f"INFO: Requested begin was before any data in this timeseries. Reseting to {begin}" + logging.debug( + f"Requested begin was before any data in this timeseries. Reseting to {begin}" ) except Exception as e: # If getting extents fails, fall back to single-threaded mode - print( - f"WARNING: Could not retrieve time series extents ({e}). Falling back to single-threaded mode." + logging.debug( + f"Could not retrieve time series extents ({e}). Falling back to single-threaded mode." ) response = api.get_with_paging( @@ -398,8 +399,8 @@ def get_timeseries( ) return Data(response, selector=selector) else: - print( - f"INFO: Fetching {len(chunks)} chunks of timeseries data with {max_workers} threads" + logging.debug( + f"Fetching {len(chunks)} chunks of timeseries data with {max_workers} threads" ) # fetch the data result_list = fetch_timeseries_chunks( @@ -667,7 +668,7 @@ def store_timeseries( return api.post(endpoint, data, params) actual_workers = min(max_workers, len(chunks)) - print( + logging.debug( f"INFO: Storing {len(chunks)} chunks of timeseries data with {actual_workers} threads" ) @@ -693,7 +694,9 @@ def store_timeseries( except Exception as e: start_time = chunk["values"][0][0] end_time = chunk["values"][-1][0] - print(f"Error storing chunk from {start_time} to {end_time}: {e}") + logging.error( + f"Error storing chunk from {start_time} to {end_time}: {e}" + ) responses.append({"error": str(e)}) return diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index 0884c7aa..79835da8 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -160,59 +160,41 @@ def test_store_timeseries_multi_chunk_ts(): # Convert DataFrame to JSON format ts_json = ts.timeseries_df_to_json(DF_CHUNK_MULTI, ts_id, units, office) - # Capture the log output - with patch("builtins.print") as mock_print: - ts.store_timeseries(ts_json, multithread=True, chunk_size=2 * 7 * 24 * 4) - - # Extract the log messages - log_messages = [call.args[0] for call in mock_print.call_args_list] - - # Find the relevant log message - store_log = next((msg for msg in log_messages if "INFO: Storing" in msg), None) - assert store_log is not None, "Expected log message not found" - - # Parse the number of chunks and threads - chunks = int(store_log.split("chunks")[0].split()[-1]) - threads = int(store_log.split("with")[1].split()[0]) + ts.store_timeseries(ts_json, multithread=True, chunk_size=2 * 7 * 24 * 4) + + data_multithread = ts.get_timeseries( + ts_id=TEST_TSID_CHUNK_MULTI, + office_id=TEST_OFFICE, + begin=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + max_days_per_chunk=14, + unit="SI", + ) - # Assert the expected values - assert chunks == 5, f"Expected 5 chunks, but got {chunks}" - assert threads == 5, f"Expected 5 threads, but got {threads}" + # make sure the dataframe matches stored dataframe + pdt.assert_frame_equal( + data_multithread.df, DF_CHUNK_MULTI + ), f"Data frames do not match: original = {DF_CHUNK_MULTI.describe()}, stored = {df.describe()}" def test_read_timeseries_multi_chunk_ts(): # Capture the log output - with patch("builtins.print") as mock_print: - data_multithread = ts.get_timeseries( - ts_id=TEST_TSID_CHUNK_MULTI, - office_id=TEST_OFFICE, - begin=START_DATE_CHUNK_MULTI, - end=END_DATE_CHUNK_MULTI, - max_days_per_chunk=14, - unit="SI", - ) - - # Extract the log messages - log_messages = [call.args[0] for call in mock_print.call_args_list] - - # Find the relevant log message - read_log = next((msg for msg in log_messages if "INFO: Fetching" in msg), None) - assert read_log is not None, "Expected log message not found" - - # Parse the number of chunks and threads - chunks = int(read_log.split("chunks")[0].split()[-1]) - threads = int(read_log.split("with")[1].split()[0]) - - # Assert the expected values - assert chunks == 5, f"Expected 5 chunks, but got {chunks}" - assert threads == 5, f"Expected 5 threads, but got {threads}" + data_multithread = ts.get_timeseries( + ts_id=TEST_TSID_CHUNK_MULTI, + office_id=TEST_OFFICE, + begin=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + max_days_per_chunk=14, + unit="SI", + ) # Check metadata for multithreaded read data_json = data_multithread.json - # check df values df = data_multithread.df.copy() + assert df is not None, "Returned DataFrame is None" + assert not df.empty, "Returned DataFrame is empty" # make sure the dataframe matches stored dataframe pdt.assert_frame_equal( From 8a3fec856ad2fdb8e457a35d41f1228f3b21804d Mon Sep 17 00:00:00 2001 From: Novotny <1533907136121002@mil> Date: Tue, 7 Oct 2025 15:17:20 -0500 Subject: [PATCH 02/14] fix test error --- tests/cda/timeseries/timeseries_CDA_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index 79835da8..c20b475c 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -170,10 +170,10 @@ def test_store_timeseries_multi_chunk_ts(): max_days_per_chunk=14, unit="SI", ) - + df = data_multithread.df # make sure the dataframe matches stored dataframe pdt.assert_frame_equal( - data_multithread.df, DF_CHUNK_MULTI + df, DF_CHUNK_MULTI ), f"Data frames do not match: original = {DF_CHUNK_MULTI.describe()}, stored = {df.describe()}" From 6756e0bba9ac33941be976b99a0c0809ca983036 Mon Sep 17 00:00:00 2001 From: Mike Perryman Date: Wed, 8 Oct 2025 09:58:21 -0500 Subject: [PATCH 03/14] Make compatible with python 3.9, keep max_workers from being zero. --- cwms/projects/water_supply/accounting.py | 3 ++- cwms/timeseries/timeseries.py | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/cwms/projects/water_supply/accounting.py b/cwms/projects/water_supply/accounting.py index 89fd4668..368b888a 100644 --- a/cwms/projects/water_supply/accounting.py +++ b/cwms/projects/water_supply/accounting.py @@ -4,6 +4,7 @@ # Source may not be released without written approval from HEC import cwms.api as api from cwms.cwms_types import JSON, Data +from typing import Union def get_pump_accounting( @@ -74,7 +75,7 @@ def get_pump_accounting( endpoint = f"projects/{office_id}/{project_id}/water-user/{water_user}/contracts/{contract_name}/accounting" - params: dict[str, str | int] = { + params: dict[str, Union[str,int]] = { "start": start, "end": end, "timezone": timezone, diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 9cdfc158..cfc729a1 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -149,11 +149,11 @@ def fetch_timeseries_chunks( chunks: List[Tuple[datetime, datetime]], ts_id: str, office_id: str, - unit: str | None, + unit: Optional[str], datum: Optional[str], - page_size: int | None, + page_size: Optional[None], version_date: Optional[datetime], - trim: bool | None, + trim: Optional[bool], max_workers: int, ) -> List[Data]: # Initialize an empty list to store results @@ -390,7 +390,7 @@ def get_timeseries( chunks = chunk_timeseries_time_range(begin, end, timedelta(days=max_days_per_chunk)) # find max worker thread - max_workers = min(len(chunks), max_workers) + max_workers = max(min(len(chunks), max_workers), 1) # if not multithread if max_workers == 1 or not multithread: From 791f42f8e0d06590ad0509c58523f8d4957918ec Mon Sep 17 00:00:00 2001 From: Mike Perryman Date: Wed, 8 Oct 2025 10:04:39 -0500 Subject: [PATCH 04/14] Fix formatting --- cwms/projects/water_supply/accounting.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cwms/projects/water_supply/accounting.py b/cwms/projects/water_supply/accounting.py index 368b888a..659ecace 100644 --- a/cwms/projects/water_supply/accounting.py +++ b/cwms/projects/water_supply/accounting.py @@ -2,9 +2,10 @@ # United States Army Corps of Engineers - Hydrologic Engineering Center (USACE/HEC) # All Rights Reserved. USACE PROPRIETARY/CONFIDENTIAL. # Source may not be released without written approval from HEC +from typing import Union + import cwms.api as api from cwms.cwms_types import JSON, Data -from typing import Union def get_pump_accounting( @@ -75,7 +76,7 @@ def get_pump_accounting( endpoint = f"projects/{office_id}/{project_id}/water-user/{water_user}/contracts/{contract_name}/accounting" - params: dict[str, Union[str,int]] = { + params: dict[str, Union[str, int]] = { "start": start, "end": end, "timezone": timezone, From b769a27fc70d4d7706e4547d300ca0a543502d2c Mon Sep 17 00:00:00 2001 From: Mike Perryman Date: Wed, 8 Oct 2025 10:08:52 -0500 Subject: [PATCH 05/14] Fix typo --- cwms/timeseries/timeseries.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index cfc729a1..6a0b88c7 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -151,7 +151,7 @@ def fetch_timeseries_chunks( office_id: str, unit: Optional[str], datum: Optional[str], - page_size: Optional[None], + page_size: Optional[int], version_date: Optional[datetime], trim: Optional[bool], max_workers: int, From f8eb30213296b73f7d20b7df4f6a45bd24a36554 Mon Sep 17 00:00:00 2001 From: Novotny <1533907136121002@mil> Date: Wed, 8 Oct 2025 12:20:04 -0500 Subject: [PATCH 06/14] add tests for multi_ts get and post with chunks --- cwms/timeseries/timeseries.py | 134 +++----------------- tests/cda/timeseries/timeseries_CDA_test.py | 74 ++++++++++- 2 files changed, 93 insertions(+), 115 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 6a0b88c7..0764716e 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -147,13 +147,9 @@ def chunk_timeseries_time_range( def fetch_timeseries_chunks( chunks: List[Tuple[datetime, datetime]], - ts_id: str, - office_id: str, - unit: Optional[str], - datum: Optional[str], - page_size: Optional[int], - version_date: Optional[datetime], - trim: Optional[bool], + params: dict, + selector: str, + endpoint: str, max_workers: int, ) -> List[Data]: # Initialize an empty list to store results @@ -162,28 +158,19 @@ def fetch_timeseries_chunks( # Create a ThreadPoolExecutor to manage multithreading with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit tasks for each chunk to the api - future_to_chunk = { - executor.submit( - get_timeseries_chunk, - ts_id, - office_id, - unit, - datum, - chunk_start, - chunk_end, - page_size, - version_date, - trim, - ): (chunk_start, chunk_end) - for chunk_start, chunk_end in chunks - } + for chunk_start, chunk_end in chunks: + params["begin"] = chunk_start.isoformat() if chunk_start else None + params["end"] = chunk_end.isoformat() if chunk_end else None + future_to_chunk = { + executor.submit(api.get_with_paging, selector, endpoint, params) + } # Process completed threads as they finish for future in concurrent.futures.as_completed(future_to_chunk): try: # Retrieve the result of the completed future - result = future.result() - results.append(result) + response = future.result() + results.append(Data(response, selector=selector)) except Exception as e: # Log or handle any errors that occur during execution chunk_start, chunk_end = future_to_chunk[future] @@ -193,44 +180,6 @@ def fetch_timeseries_chunks( return results -def get_timeseries_chunk( - ts_id: str, - office_id: str, - unit: Optional[str] = "EN", - datum: Optional[str] = None, - begin: Optional[datetime] = None, - end: Optional[datetime] = None, - page_size: Optional[int] = 300000, - version_date: Optional[datetime] = None, - trim: Optional[bool] = True, -) -> Data: - - # creates the dataframe from the timeseries data - endpoint = "timeseries" - if begin and not isinstance(begin, datetime): - raise ValueError("begin needs to be in datetime") - if end and not isinstance(end, datetime): - raise ValueError("end needs to be in datetime") - if version_date and not isinstance(version_date, datetime): - raise ValueError("version_date needs to be in datetime") - params = { - "office": office_id, - "name": ts_id, - "unit": unit, - "datum": datum, - "begin": begin.isoformat() if begin else None, - "end": end.isoformat() if end else None, - "page-size": page_size, - "page": None, - "version-date": version_date.isoformat() if version_date else None, - "trim": trim, - } - selector = "values" - - response = api.get_with_paging(selector=selector, endpoint=endpoint, params=params) - return Data(response, selector=selector) - - def combine_timeseries_results(results: List[Data]) -> Data: """ Combines the results from multiple chunks into a single cwms Data object. @@ -349,7 +298,7 @@ def get_timeseries( """ selector = "values" - + endpoint = "timeseries" params = { "office": office_id, "name": ts_id, @@ -382,7 +331,7 @@ def get_timeseries( ) response = api.get_with_paging( - selector=selector, endpoint="timeseries", params=params + selector=selector, endpoint=endpoint, params=params ) return Data(response, selector=selector) @@ -405,13 +354,9 @@ def get_timeseries( # fetch the data result_list = fetch_timeseries_chunks( chunks, - ts_id, - office_id, - unit, - datum, - page_size, - version_date, - trim, + params, + selector, + endpoint, max_workers, ) @@ -576,42 +521,6 @@ def chunk_timeseries_data( return chunk_list -def store_timeseries_chunk( - data: JSON, - create_as_ltrs: Optional[bool] = False, - store_rule: Optional[str] = None, - override_protection: Optional[bool] = False, -) -> None: - """ - Stores a single chunk of time series data. - - Parameters - ---------- - chunk : list - A subset of time series values to be stored. - create_as_ltrs : bool - Flag indicating if timeseries should be created as Local Regular Time Series. - store_rule : str - The business rule to use when merging the incoming with existing data. - override_protection : bool - A flag to ignore the protected data quality when storing data. - - Returns - ------- - response - API response for the chunk storage. - """ - endpoint = "timeseries" - params = { - "create-as-lrts": create_as_ltrs, - "store-rule": store_rule, - "override-protection": override_protection, - } - - # Make the API call - return api.post(endpoint, data, params) - - def store_timeseries( data: JSON, create_as_ltrs: Optional[bool] = False, @@ -669,7 +578,7 @@ def store_timeseries( actual_workers = min(max_workers, len(chunks)) logging.debug( - f"INFO: Storing {len(chunks)} chunks of timeseries data with {actual_workers} threads" + f"Storing {len(chunks)} chunks of timeseries data with {actual_workers} threads" ) # Store chunks concurrently @@ -680,11 +589,10 @@ def store_timeseries( # Submit each chunk as a separate task to the executor for chunk in chunks: future = executor.submit( - store_timeseries_chunk, # The function to execute - chunk, # The chunk of data to store - create_as_ltrs, # Whether to create as LRTS - store_rule, # The store rule to use - override_protection, # Whether to override protection + api.post, # The function to execute + endpoint, # The chunk of data to store + data, + params, ) futures.append(future) # Add the future to the list diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index c20b475c..4a31cc4b 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -12,6 +12,8 @@ TEST_LOCATION_ID = "pytest_group" TEST_TSID = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Test" TEST_TSID_MULTI = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi" +TEST_TSID_MULTI1 = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-1" +TEST_TSID_MULTI2 = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-2" TEST_TSID_STORE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Store" TEST_TSID_CHUNK_MULTI = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Chunk" TEST_TSID_DELETE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Delete" @@ -33,6 +35,28 @@ } ) +DF_MULTI_TIMESERIES1 = pd.DataFrame( + { + "date-time": DT_CHUNK_MULTI, + "value": [86.57 + (i % 10) * 0.01 for i in range(len(DT_CHUNK_MULTI))], + "quality-code": [0] * len(DT_CHUNK_MULTI), + "ts_id": [TEST_TSID_MULTI1] * len(DT_CHUNK_MULTI), + "units": ["ft"] * len(DT_CHUNK_MULTI), + } +) + +DF_MULTI_TIMESERIES2 = pd.DataFrame( + { + "date-time": DT_CHUNK_MULTI, + "value": [86.57 + (i % 10) * 0.01 for i in range(len(DT_CHUNK_MULTI))], + "quality-code": [0] * len(DT_CHUNK_MULTI), + "ts_id": [TEST_TSID_MULTI2] * len(DT_CHUNK_MULTI), + "units": ["ft"] * len(DT_CHUNK_MULTI), + } +) + +DF_MULTI_TIMESERIES = pd.concat([DF_MULTI_TIMESERIES1, DF_MULTI_TIMESERIES2]) + @pytest.fixture(scope="module", autouse=True) def setup_data(): @@ -138,6 +162,52 @@ def test_store_multi_timeseries_df(): assert data2["values"][0][1] == pytest.approx(8) +def test_store_multi_timeseries_chunks_df(): + # test getting multi timeseries while using the chunk method as well + ts.store_multi_timeseries_df(data=DF_MULTI_TIMESERIES, office_id=TEST_OFFICE) + data1 = ts.get_timeseries( + ts_id=TEST_TSID_MULTI1, + office_id=TEST_OFFICE, + begin=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + ).df + data2 = ts.get_timeseries( + ts_id=TEST_TSID_MULTI2, + office_id=TEST_OFFICE, + begin=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + ).df + + pdt.assert_frame_equal( + data1, DF_MULTI_TIMESERIES1 + ), f"Data frames do not match: original = {DF_MULTI_TIMESERIES1.describe()}, stored = {data1.describe()}" + + pdt.assert_frame_equal( + data2, DF_MULTI_TIMESERIES2 + ), f"Data frames do not match: original = {DF_MULTI_TIMESERIES2.describe()}, stored = {data2.describe()}" + + +def test_get_multi_timeseries_chunk_df(): + df = ts.get_multi_timeseries_df( + ts_ids=[TEST_TSID_MULTI1, TEST_TSID_MULTI2], + office_id=TEST_OFFICE, + begin=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + ) + assert df is not None, "Returned DataFrame is None" + assert not df.empty, "Returned DataFrame is empty" + assert any( + TEST_TSID_MULTI1 in str(col) for col in df.columns + ), f"{TEST_TSID_MULTI1} not found in DataFrame columns" + assert any( + TEST_TSID_MULTI2 in str(col) for col in df.columns + ), f"{TEST_TSID_MULTI2} not found in DataFrame columns" + + pdt.assert_frame_equal( + df, DF_MULTI_TIMESERIES + ), f"Data frames do not match: original = {DF_MULTI_TIMESERIES.describe()}, stored = {df.describe()}" + + def test_get_multi_timeseries_df(): ts_id_rev_test = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") df = ts.get_multi_timeseries_df([TEST_TSID_MULTI, ts_id_rev_test], TEST_OFFICE) @@ -151,7 +221,7 @@ def test_get_multi_timeseries_df(): ), f"{ts_id_rev_test} not found in DataFrame columns" -def test_store_timeseries_multi_chunk_ts(): +def test_store_timeseries_chunk_ts(): # Define parameters ts_id = TEST_TSID_CHUNK_MULTI office = TEST_OFFICE @@ -177,7 +247,7 @@ def test_store_timeseries_multi_chunk_ts(): ), f"Data frames do not match: original = {DF_CHUNK_MULTI.describe()}, stored = {df.describe()}" -def test_read_timeseries_multi_chunk_ts(): +def test_read_timeseries_chunk_ts(): # Capture the log output data_multithread = ts.get_timeseries( From 561ea409ac3a240792d8891eb5041d3f302360e9 Mon Sep 17 00:00:00 2001 From: Novotny <1533907136121002@mil> Date: Wed, 8 Oct 2025 12:41:51 -0500 Subject: [PATCH 07/14] fix mypy --- cwms/timeseries/timeseries.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 0764716e..ef9c4a84 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -147,7 +147,7 @@ def chunk_timeseries_time_range( def fetch_timeseries_chunks( chunks: List[Tuple[datetime, datetime]], - params: dict, + params: Dict[str, Any], selector: str, endpoint: str, max_workers: int, @@ -173,10 +173,7 @@ def fetch_timeseries_chunks( results.append(Data(response, selector=selector)) except Exception as e: # Log or handle any errors that occur during execution - chunk_start, chunk_end = future_to_chunk[future] - logging.error( - f"ERROR: Failed to fetch data from {chunk_start} to {chunk_end}: {e}" - ) + logging.error(f"ERROR: Failed to fetch chunk of data from: {e}") return results From 8db8a0527ae1008755a3acaba86c8efd977686ed Mon Sep 17 00:00:00 2001 From: Novotny <1533907136121002@mil> Date: Wed, 8 Oct 2025 15:58:32 -0500 Subject: [PATCH 08/14] fix tests --- cwms/timeseries/timeseries.py | 43 +++++++++++++++------ tests/cda/timeseries/timeseries_CDA_test.py | 10 +++-- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index ef9c4a84..14e5bc3b 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -145,6 +145,16 @@ def chunk_timeseries_time_range( return chunks +def get_timeseries_chunk( + selector: str, endpoint: str, param: Dict[str, Any], begin: datetime, end: datetime +) -> Data: + + param["begin"] = begin.isoformat() if begin else None + param["end"] = end.isoformat() if end else None + response = api.get_with_paging(selector=selector, endpoint=endpoint, params=param) + return Data(response, selector=selector) + + def fetch_timeseries_chunks( chunks: List[Tuple[datetime, datetime]], params: Dict[str, Any], @@ -152,28 +162,37 @@ def fetch_timeseries_chunks( endpoint: str, max_workers: int, ) -> List[Data]: + # Initialize an empty list to store results results = [] # Create a ThreadPoolExecutor to manage multithreading with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit tasks for each chunk to the api - for chunk_start, chunk_end in chunks: - params["begin"] = chunk_start.isoformat() if chunk_start else None - params["end"] = chunk_end.isoformat() if chunk_end else None - future_to_chunk = { - executor.submit(api.get_with_paging, selector, endpoint, params) - } + future_to_chunk = { + executor.submit( + get_timeseries_chunk, + selector, + endpoint, + params.copy(), + chunk_start, + chunk_end, + ): (chunk_start, chunk_end) + for chunk_start, chunk_end in chunks + } # Process completed threads as they finish for future in concurrent.futures.as_completed(future_to_chunk): try: # Retrieve the result of the completed future - response = future.result() - results.append(Data(response, selector=selector)) + result = future.result() + results.append(result) except Exception as e: + chunk_start, chunk_end = future_to_chunk[future] # Log or handle any errors that occur during execution - logging.error(f"ERROR: Failed to fetch chunk of data from: {e}") + logging.error( + f"ERROR: Failed to fetch data from {chunk_start} to {chunk_end}: {e}" + ) return results @@ -434,7 +453,9 @@ def timeseries_df_to_json( def store_multi_timeseries_df( - data: pd.DataFrame, office_id: str, max_workers: Optional[int] = 30 + data: pd.DataFrame, + office_id: str, + max_workers: Optional[int] = 30, ) -> None: def store_ts_ids( data: pd.DataFrame, @@ -452,7 +473,7 @@ def store_ts_ids( office_id=office_id, version_date=version_date, ) - store_timeseries(data=data_json) + store_timeseries(data=data_json, multithread=multithread) except Exception as e: print(f"Error processing {ts_id}: {e}") return None diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index 4a31cc4b..e6901005 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -150,8 +150,8 @@ def test_store_multi_timeseries_df(): } ) ts.store_multi_timeseries_df(df, TEST_OFFICE) - data1 = ts.get_timeseries(TEST_TSID_MULTI, TEST_OFFICE).json - data2 = ts.get_timeseries(ts_id_rev_test, TEST_OFFICE).json + data1 = ts.get_timeseries(TEST_TSID_MULTI, TEST_OFFICE, multithread=False).json + data2 = ts.get_timeseries(ts_id_rev_test, TEST_OFFICE, multithread=False).json assert data1["name"] == TEST_TSID_MULTI assert data1["office-id"] == TEST_OFFICE assert data1["units"] == "ft" @@ -164,18 +164,22 @@ def test_store_multi_timeseries_df(): def test_store_multi_timeseries_chunks_df(): # test getting multi timeseries while using the chunk method as well - ts.store_multi_timeseries_df(data=DF_MULTI_TIMESERIES, office_id=TEST_OFFICE) + ts.store_multi_timeseries_df( + data=DF_MULTI_TIMESERIES, office_id=TEST_OFFICE, store_multithread=True + ) data1 = ts.get_timeseries( ts_id=TEST_TSID_MULTI1, office_id=TEST_OFFICE, begin=START_DATE_CHUNK_MULTI, end=END_DATE_CHUNK_MULTI, + multithread=False, ).df data2 = ts.get_timeseries( ts_id=TEST_TSID_MULTI2, office_id=TEST_OFFICE, begin=START_DATE_CHUNK_MULTI, end=END_DATE_CHUNK_MULTI, + multithread=False, ).df pdt.assert_frame_equal( From 43242a3c5568510afe4dc0b1297b3d9b8b7a6945 Mon Sep 17 00:00:00 2001 From: Novotny <1533907136121002@mil> Date: Wed, 8 Oct 2025 16:06:59 -0500 Subject: [PATCH 09/14] fix test --- tests/cda/timeseries/timeseries_CDA_test.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index e6901005..c01725f4 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -164,9 +164,7 @@ def test_store_multi_timeseries_df(): def test_store_multi_timeseries_chunks_df(): # test getting multi timeseries while using the chunk method as well - ts.store_multi_timeseries_df( - data=DF_MULTI_TIMESERIES, office_id=TEST_OFFICE, store_multithread=True - ) + ts.store_multi_timeseries_df(data=DF_MULTI_TIMESERIES, office_id=TEST_OFFICE) data1 = ts.get_timeseries( ts_id=TEST_TSID_MULTI1, office_id=TEST_OFFICE, From 0e17d7d2961ba6d4bb77159a363d454773729a0d Mon Sep 17 00:00:00 2001 From: Novotny <1533907136121002@mil> Date: Wed, 8 Oct 2025 16:49:11 -0500 Subject: [PATCH 10/14] update to have tests use 3.9. --- .github/workflows/CDA-testing.yml | 8 ++++---- tests/cda/timeseries/timeseries_CDA_test.py | 12 +++++++----- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/.github/workflows/CDA-testing.yml b/.github/workflows/CDA-testing.yml index e214e0c0..eb17ad52 100644 --- a/.github/workflows/CDA-testing.yml +++ b/.github/workflows/CDA-testing.yml @@ -2,9 +2,9 @@ name: CI on: push: - branches: [ main, githubAction-testing ] + branches: [main, githubAction-testing] pull_request: - branches: [ main ] + branches: [main] workflow_dispatch: jobs: @@ -20,7 +20,7 @@ jobs: - name: Set Up Python uses: actions/setup-python@v6 with: - python-version: '3.x' + python-version: '3.9.X' # Unlike the code-check workflow, this job requires the dev dependencies to be # installed to make sure we have the necessary, tools, stub files, etc. @@ -62,4 +62,4 @@ jobs: with: file: ./code-coverage-results.md vars: |- - empty: empty \ No newline at end of file + empty: empty diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index c01725f4..7cedcec1 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -179,14 +179,15 @@ def test_store_multi_timeseries_chunks_df(): end=END_DATE_CHUNK_MULTI, multithread=False, ).df - + df_multi1 = DF_MULTI_TIMESERIES1[["date-time", "value", "quality_code"]] + df_multi2 = DF_MULTI_TIMESERIES2[["date-time", "value", "quality_code"]] pdt.assert_frame_equal( - data1, DF_MULTI_TIMESERIES1 - ), f"Data frames do not match: original = {DF_MULTI_TIMESERIES1.describe()}, stored = {data1.describe()}" + data1, df_multi1 + ), f"Data frames do not match: original = {df_multi1.describe()}, stored = {data1.describe()}" pdt.assert_frame_equal( - data2, DF_MULTI_TIMESERIES2 - ), f"Data frames do not match: original = {DF_MULTI_TIMESERIES2.describe()}, stored = {data2.describe()}" + data2, df_multi2 + ), f"Data frames do not match: original = {df_multi2.describe()}, stored = {data2.describe()}" def test_get_multi_timeseries_chunk_df(): @@ -195,6 +196,7 @@ def test_get_multi_timeseries_chunk_df(): office_id=TEST_OFFICE, begin=START_DATE_CHUNK_MULTI, end=END_DATE_CHUNK_MULTI, + melted=True, ) assert df is not None, "Returned DataFrame is None" assert not df.empty, "Returned DataFrame is empty" From 443c3ca086d7b910693b006e1e8eaa54acc5807a Mon Sep 17 00:00:00 2001 From: Novotny <1533907136121002@mil> Date: Wed, 8 Oct 2025 16:58:08 -0500 Subject: [PATCH 11/14] fix test --- tests/cda/timeseries/timeseries_CDA_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index 7cedcec1..8682002f 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -179,8 +179,8 @@ def test_store_multi_timeseries_chunks_df(): end=END_DATE_CHUNK_MULTI, multithread=False, ).df - df_multi1 = DF_MULTI_TIMESERIES1[["date-time", "value", "quality_code"]] - df_multi2 = DF_MULTI_TIMESERIES2[["date-time", "value", "quality_code"]] + df_multi1 = DF_MULTI_TIMESERIES1[["date-time", "value", "quality-code"]] + df_multi2 = DF_MULTI_TIMESERIES2[["date-time", "value", "quality-code"]] pdt.assert_frame_equal( data1, df_multi1 ), f"Data frames do not match: original = {df_multi1.describe()}, stored = {data1.describe()}" From 91e95ac8d7f23ed1f6d44e8a279391a944ba2ccc Mon Sep 17 00:00:00 2001 From: Novotny <1533907136121002@mil> Date: Wed, 8 Oct 2025 17:09:02 -0500 Subject: [PATCH 12/14] fix --- tests/cda/timeseries/timeseries_CDA_test.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index 8682002f..176c6978 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -200,12 +200,6 @@ def test_get_multi_timeseries_chunk_df(): ) assert df is not None, "Returned DataFrame is None" assert not df.empty, "Returned DataFrame is empty" - assert any( - TEST_TSID_MULTI1 in str(col) for col in df.columns - ), f"{TEST_TSID_MULTI1} not found in DataFrame columns" - assert any( - TEST_TSID_MULTI2 in str(col) for col in df.columns - ), f"{TEST_TSID_MULTI2} not found in DataFrame columns" pdt.assert_frame_equal( df, DF_MULTI_TIMESERIES From 2abeefc940428521dbb4c1e3e617f3c1d56390f5 Mon Sep 17 00:00:00 2001 From: Eric Novotny Date: Thu, 9 Oct 2025 09:05:41 -0700 Subject: [PATCH 13/14] fix test --- tests/cda/timeseries/timeseries_CDA_test.py | 46 +++++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index 176c6978..1afe21a2 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -17,9 +17,21 @@ TEST_TSID_STORE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Store" TEST_TSID_CHUNK_MULTI = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Chunk" TEST_TSID_DELETE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Delete" +TS_ID_REV_TEST = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") # Generate 15-minute interval timestamps START_DATE_CHUNK_MULTI = datetime(2025, 7, 31, 0, 0, tzinfo=timezone.utc) END_DATE_CHUNK_MULTI = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc) +TSIDS = [ + TS_ID_REV_TEST, + TEST_TSID, + TEST_TSID_MULTI, + TEST_TSID_MULTI1, + TEST_TSID_MULTI2, + TEST_TSID_STORE, + TEST_TSID_CHUNK_MULTI, +] + + DT_CHUNK_MULTI = pd.date_range( start=START_DATE_CHUNK_MULTI, end=END_DATE_CHUNK_MULTI, @@ -55,7 +67,9 @@ } ) -DF_MULTI_TIMESERIES = pd.concat([DF_MULTI_TIMESERIES1, DF_MULTI_TIMESERIES2]) +DF_MULTI_TIMESERIES = pd.concat( + [DF_MULTI_TIMESERIES1, DF_MULTI_TIMESERIES2] +).reset_index(drop=True) @pytest.fixture(scope="module", autouse=True) @@ -78,6 +92,13 @@ def setup_data(): cwms.store_location(location) yield + for ts_id in TSIDS: + try: + cwms.delete_timeseries_identifier( + ts_id=ts_id, office_id=TEST_OFFICE, delete_method="DELETE_ALL" + ) + except Exception as e: + print(f"Failed to delete tsid {ts_id}: {e}") cwms.delete_location(TEST_LOCATION_ID, TEST_OFFICE, cascade_delete=True) @@ -139,24 +160,24 @@ def test_timeseries_df_to_json(): def test_store_multi_timeseries_df(): now = datetime.now(timezone.utc).replace(microsecond=0) - ts_id_rev_test = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") + TS_ID_REV_TEST = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") df = pd.DataFrame( { "date-time": [now, now], "value": [7, 8], "quality-code": [0, 0], - "ts_id": [TEST_TSID_MULTI, ts_id_rev_test], + "ts_id": [TEST_TSID_MULTI, TS_ID_REV_TEST], "units": ["ft", "ft"], } ) ts.store_multi_timeseries_df(df, TEST_OFFICE) data1 = ts.get_timeseries(TEST_TSID_MULTI, TEST_OFFICE, multithread=False).json - data2 = ts.get_timeseries(ts_id_rev_test, TEST_OFFICE, multithread=False).json + data2 = ts.get_timeseries(TS_ID_REV_TEST, TEST_OFFICE, multithread=False).json assert data1["name"] == TEST_TSID_MULTI assert data1["office-id"] == TEST_OFFICE assert data1["units"] == "ft" assert data1["values"][0][1] == pytest.approx(7) - assert data2["name"] == ts_id_rev_test + assert data2["name"] == TS_ID_REV_TEST assert data2["office-id"] == TEST_OFFICE assert data2["units"] == "ft" assert data2["values"][0][1] == pytest.approx(8) @@ -207,16 +228,16 @@ def test_get_multi_timeseries_chunk_df(): def test_get_multi_timeseries_df(): - ts_id_rev_test = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") - df = ts.get_multi_timeseries_df([TEST_TSID_MULTI, ts_id_rev_test], TEST_OFFICE) + TS_ID_REV_TEST = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") + df = ts.get_multi_timeseries_df([TEST_TSID_MULTI, TS_ID_REV_TEST], TEST_OFFICE) assert df is not None, "Returned DataFrame is None" assert not df.empty, "Returned DataFrame is empty" assert any( TEST_TSID_MULTI in str(col) for col in df.columns ), f"{TEST_TSID_MULTI} not found in DataFrame columns" assert any( - ts_id_rev_test in str(col) for col in df.columns - ), f"{ts_id_rev_test} not found in DataFrame columns" + TS_ID_REV_TEST in str(col) for col in df.columns + ), f"{TS_ID_REV_TEST} not found in DataFrame columns" def test_store_timeseries_chunk_ts(): @@ -246,7 +267,6 @@ def test_store_timeseries_chunk_ts(): def test_read_timeseries_chunk_ts(): - # Capture the log output data_multithread = ts.get_timeseries( ts_id=TEST_TSID_CHUNK_MULTI, @@ -276,10 +296,10 @@ def test_read_timeseries_chunk_ts(): def test_delete_timeseries(): - ts_id_rev_test = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") + TS_ID_REV_TEST = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") now = datetime.now(timezone.utc).replace(microsecond=0) begin = now - timedelta(minutes=15) end = now + timedelta(minutes=15) - ts.delete_timeseries(ts_id_rev_test, TEST_OFFICE, begin, end) - result = ts.get_timeseries(ts_id_rev_test, TEST_OFFICE) + ts.delete_timeseries(TS_ID_REV_TEST, TEST_OFFICE, begin, end) + result = ts.get_timeseries(TS_ID_REV_TEST, TEST_OFFICE) assert result is None or result.json.get("values", []) == [] From dd7308b6867185417a1097ca115269b4638e2531 Mon Sep 17 00:00:00 2001 From: Eric Novotny Date: Thu, 9 Oct 2025 09:17:16 -0700 Subject: [PATCH 14/14] fix error message --- cwms/timeseries/timeseries.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 14e5bc3b..809d2005 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -148,7 +148,6 @@ def chunk_timeseries_time_range( def get_timeseries_chunk( selector: str, endpoint: str, param: Dict[str, Any], begin: datetime, end: datetime ) -> Data: - param["begin"] = begin.isoformat() if begin else None param["end"] = end.isoformat() if end else None response = api.get_with_paging(selector=selector, endpoint=endpoint, params=param) @@ -162,7 +161,6 @@ def fetch_timeseries_chunks( endpoint: str, max_workers: int, ) -> List[Data]: - # Initialize an empty list to store results results = [] @@ -191,7 +189,7 @@ def fetch_timeseries_chunks( chunk_start, chunk_end = future_to_chunk[future] # Log or handle any errors that occur during execution logging.error( - f"ERROR: Failed to fetch data from {chunk_start} to {chunk_end}: {e}" + f"Failed to fetch data from {chunk_start} to {chunk_end}: {e}" ) return results