diff --git a/.github/workflows/CDA-testing.yml b/.github/workflows/CDA-testing.yml index e214e0c0..eb17ad52 100644 --- a/.github/workflows/CDA-testing.yml +++ b/.github/workflows/CDA-testing.yml @@ -2,9 +2,9 @@ name: CI on: push: - branches: [ main, githubAction-testing ] + branches: [main, githubAction-testing] pull_request: - branches: [ main ] + branches: [main] workflow_dispatch: jobs: @@ -20,7 +20,7 @@ jobs: - name: Set Up Python uses: actions/setup-python@v6 with: - python-version: '3.x' + python-version: '3.9.X' # Unlike the code-check workflow, this job requires the dev dependencies to be # installed to make sure we have the necessary, tools, stub files, etc. @@ -62,4 +62,4 @@ jobs: with: file: ./code-coverage-results.md vars: |- - empty: empty \ No newline at end of file + empty: empty diff --git a/cwms/api.py b/cwms/api.py index 022f9e6e..8cc34691 100644 --- a/cwms/api.py +++ b/cwms/api.py @@ -305,6 +305,25 @@ def get_with_paging( return response +def _post_function( + endpoint: str, + data: Any, + params: Optional[RequestParams] = None, + *, + api_version: int = API_VERSION, +) -> Any: + + # post requires different headers than get for + headers = {"accept": "*/*", "Content-Type": api_version_text(api_version)} + if isinstance(data, dict) or isinstance(data, list): + data = json.dumps(data) + with SESSION.post(endpoint, params=params, headers=headers, data=data) as response: + if not response.ok: + logging.error(f"CDA Error: response={response}") + raise ApiError(response) + return response + + def post( endpoint: str, data: Any, @@ -329,11 +348,7 @@ def post( Raises: ApiError: If an error response is return by the API. """ - - post_with_returned_data( - endpoint=endpoint, data=data, params=params, api_version=api_version - ) - return None + _post_function(endpoint=endpoint, data=data, params=params, api_version=api_version) def post_with_returned_data( @@ -361,17 +376,10 @@ def post_with_returned_data( ApiError: If an error response is return by the API. """ - # post requires different headers than get for - headers = {"accept": "*/*", "Content-Type": api_version_text(api_version)} - - if isinstance(data, dict) or isinstance(data, list): - data = json.dumps(data) - - with SESSION.post(endpoint, params=params, headers=headers, data=data) as response: - if not response.ok: - logging.error(f"CDA Error: response={response}") - raise ApiError(response) - return _process_response(response) + response = _post_function( + endpoint=endpoint, data=data, params=params, api_version=api_version + ) + return _process_response(response) def patch( diff --git a/cwms/projects/water_supply/accounting.py b/cwms/projects/water_supply/accounting.py index 89fd4668..659ecace 100644 --- a/cwms/projects/water_supply/accounting.py +++ b/cwms/projects/water_supply/accounting.py @@ -2,6 +2,8 @@ # United States Army Corps of Engineers - Hydrologic Engineering Center (USACE/HEC) # All Rights Reserved. USACE PROPRIETARY/CONFIDENTIAL. # Source may not be released without written approval from HEC +from typing import Union + import cwms.api as api from cwms.cwms_types import JSON, Data @@ -74,7 +76,7 @@ def get_pump_accounting( endpoint = f"projects/{office_id}/{project_id}/water-user/{water_user}/contracts/{contract_name}/accounting" - params: dict[str, str | int] = { + params: dict[str, Union[str, int]] = { "start": start, "end": end, "timezone": timezone, diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 7598e340..809d2005 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -1,4 +1,5 @@ import concurrent.futures +import logging from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional, Tuple @@ -82,7 +83,7 @@ def get_ts_ids(ts_id: str) -> Any: } return result_dict except Exception as e: - print(f"Error processing {ts_id}: {e}") + logging.error(f"Error processing {ts_id}: {e}") return None with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: @@ -144,15 +145,20 @@ def chunk_timeseries_time_range( return chunks +def get_timeseries_chunk( + selector: str, endpoint: str, param: Dict[str, Any], begin: datetime, end: datetime +) -> Data: + param["begin"] = begin.isoformat() if begin else None + param["end"] = end.isoformat() if end else None + response = api.get_with_paging(selector=selector, endpoint=endpoint, params=param) + return Data(response, selector=selector) + + def fetch_timeseries_chunks( chunks: List[Tuple[datetime, datetime]], - ts_id: str, - office_id: str, - unit: Optional[str], - datum: Optional[str], - page_size: Optional[int], - version_date: Optional[datetime], - trim: Optional[bool], + params: Dict[str, Any], + selector: str, + endpoint: str, max_workers: int, ) -> List[Data]: # Initialize an empty list to store results @@ -164,15 +170,11 @@ def fetch_timeseries_chunks( future_to_chunk = { executor.submit( get_timeseries_chunk, - ts_id, - office_id, - unit, - datum, + selector, + endpoint, + params.copy(), chunk_start, chunk_end, - page_size, - version_date, - trim, ): (chunk_start, chunk_end) for chunk_start, chunk_end in chunks } @@ -184,52 +186,14 @@ def fetch_timeseries_chunks( result = future.result() results.append(result) except Exception as e: - # Log or handle any errors that occur during execution chunk_start, chunk_end = future_to_chunk[future] - print( - f"ERROR: Failed to fetch data from {chunk_start} to {chunk_end}: {e}" + # Log or handle any errors that occur during execution + logging.error( + f"Failed to fetch data from {chunk_start} to {chunk_end}: {e}" ) return results -def get_timeseries_chunk( - ts_id: str, - office_id: str, - unit: Optional[str] = "EN", - datum: Optional[str] = None, - begin: Optional[datetime] = None, - end: Optional[datetime] = None, - page_size: Optional[int] = 300000, - version_date: Optional[datetime] = None, - trim: Optional[bool] = True, -) -> Data: - - # creates the dataframe from the timeseries data - endpoint = "timeseries" - if begin and not isinstance(begin, datetime): - raise ValueError("begin needs to be in datetime") - if end and not isinstance(end, datetime): - raise ValueError("end needs to be in datetime") - if version_date and not isinstance(version_date, datetime): - raise ValueError("version_date needs to be in datetime") - params = { - "office": office_id, - "name": ts_id, - "unit": unit, - "datum": datum, - "begin": begin.isoformat() if begin else None, - "end": end.isoformat() if end else None, - "page-size": page_size, - "page": None, - "version-date": version_date.isoformat() if version_date else None, - "trim": trim, - } - selector = "values" - - response = api.get_with_paging(selector=selector, endpoint=endpoint, params=params) - return Data(response, selector=selector) - - def combine_timeseries_results(results: List[Data]) -> Data: """ Combines the results from multiple chunks into a single cwms Data object. @@ -348,7 +312,7 @@ def get_timeseries( """ selector = "values" - + endpoint = "timeseries" params = { "office": office_id, "name": ts_id, @@ -371,17 +335,17 @@ def get_timeseries( # replace begin with begin extent if outside extents if begin < begin_extent: begin = begin_extent - print( - f"INFO: Requested begin was before any data in this timeseries. Reseting to {begin}" + logging.debug( + f"Requested begin was before any data in this timeseries. Reseting to {begin}" ) except Exception as e: # If getting extents fails, fall back to single-threaded mode - print( - f"WARNING: Could not retrieve time series extents ({e}). Falling back to single-threaded mode." + logging.debug( + f"Could not retrieve time series extents ({e}). Falling back to single-threaded mode." ) response = api.get_with_paging( - selector=selector, endpoint="timeseries", params=params + selector=selector, endpoint=endpoint, params=params ) return Data(response, selector=selector) @@ -389,7 +353,7 @@ def get_timeseries( chunks = chunk_timeseries_time_range(begin, end, timedelta(days=max_days_per_chunk)) # find max worker thread - max_workers = min(len(chunks), max_workers) + max_workers = max(min(len(chunks), max_workers), 1) # if not multithread if max_workers == 1 or not multithread: @@ -398,19 +362,15 @@ def get_timeseries( ) return Data(response, selector=selector) else: - print( - f"INFO: Fetching {len(chunks)} chunks of timeseries data with {max_workers} threads" + logging.debug( + f"Fetching {len(chunks)} chunks of timeseries data with {max_workers} threads" ) # fetch the data result_list = fetch_timeseries_chunks( chunks, - ts_id, - office_id, - unit, - datum, - page_size, - version_date, - trim, + params, + selector, + endpoint, max_workers, ) @@ -491,7 +451,9 @@ def timeseries_df_to_json( def store_multi_timeseries_df( - data: pd.DataFrame, office_id: str, max_workers: Optional[int] = 30 + data: pd.DataFrame, + office_id: str, + max_workers: Optional[int] = 30, ) -> None: def store_ts_ids( data: pd.DataFrame, @@ -509,7 +471,7 @@ def store_ts_ids( office_id=office_id, version_date=version_date, ) - store_timeseries(data=data_json) + store_timeseries(data=data_json, multithread=multithread) except Exception as e: print(f"Error processing {ts_id}: {e}") return None @@ -575,42 +537,6 @@ def chunk_timeseries_data( return chunk_list -def store_timeseries_chunk( - data: JSON, - create_as_ltrs: Optional[bool] = False, - store_rule: Optional[str] = None, - override_protection: Optional[bool] = False, -) -> None: - """ - Stores a single chunk of time series data. - - Parameters - ---------- - chunk : list - A subset of time series values to be stored. - create_as_ltrs : bool - Flag indicating if timeseries should be created as Local Regular Time Series. - store_rule : str - The business rule to use when merging the incoming with existing data. - override_protection : bool - A flag to ignore the protected data quality when storing data. - - Returns - ------- - response - API response for the chunk storage. - """ - endpoint = "timeseries" - params = { - "create-as-lrts": create_as_ltrs, - "store-rule": store_rule, - "override-protection": override_protection, - } - - # Make the API call - return api.post(endpoint, data, params) - - def store_timeseries( data: JSON, create_as_ltrs: Optional[bool] = False, @@ -667,8 +593,8 @@ def store_timeseries( return api.post(endpoint, data, params) actual_workers = min(max_workers, len(chunks)) - print( - f"INFO: Storing {len(chunks)} chunks of timeseries data with {actual_workers} threads" + logging.debug( + f"Storing {len(chunks)} chunks of timeseries data with {actual_workers} threads" ) # Store chunks concurrently @@ -679,11 +605,10 @@ def store_timeseries( # Submit each chunk as a separate task to the executor for chunk in chunks: future = executor.submit( - store_timeseries_chunk, # The function to execute - chunk, # The chunk of data to store - create_as_ltrs, # Whether to create as LRTS - store_rule, # The store rule to use - override_protection, # Whether to override protection + api.post, # The function to execute + endpoint, # The chunk of data to store + data, + params, ) futures.append(future) # Add the future to the list @@ -693,7 +618,9 @@ def store_timeseries( except Exception as e: start_time = chunk["values"][0][0] end_time = chunk["values"][-1][0] - print(f"Error storing chunk from {start_time} to {end_time}: {e}") + logging.error( + f"Error storing chunk from {start_time} to {end_time}: {e}" + ) responses.append({"error": str(e)}) return diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index 0884c7aa..1afe21a2 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -12,12 +12,26 @@ TEST_LOCATION_ID = "pytest_group" TEST_TSID = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Test" TEST_TSID_MULTI = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi" +TEST_TSID_MULTI1 = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-1" +TEST_TSID_MULTI2 = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-2" TEST_TSID_STORE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Store" TEST_TSID_CHUNK_MULTI = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Chunk" TEST_TSID_DELETE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Delete" +TS_ID_REV_TEST = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") # Generate 15-minute interval timestamps START_DATE_CHUNK_MULTI = datetime(2025, 7, 31, 0, 0, tzinfo=timezone.utc) END_DATE_CHUNK_MULTI = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc) +TSIDS = [ + TS_ID_REV_TEST, + TEST_TSID, + TEST_TSID_MULTI, + TEST_TSID_MULTI1, + TEST_TSID_MULTI2, + TEST_TSID_STORE, + TEST_TSID_CHUNK_MULTI, +] + + DT_CHUNK_MULTI = pd.date_range( start=START_DATE_CHUNK_MULTI, end=END_DATE_CHUNK_MULTI, @@ -33,6 +47,30 @@ } ) +DF_MULTI_TIMESERIES1 = pd.DataFrame( + { + "date-time": DT_CHUNK_MULTI, + "value": [86.57 + (i % 10) * 0.01 for i in range(len(DT_CHUNK_MULTI))], + "quality-code": [0] * len(DT_CHUNK_MULTI), + "ts_id": [TEST_TSID_MULTI1] * len(DT_CHUNK_MULTI), + "units": ["ft"] * len(DT_CHUNK_MULTI), + } +) + +DF_MULTI_TIMESERIES2 = pd.DataFrame( + { + "date-time": DT_CHUNK_MULTI, + "value": [86.57 + (i % 10) * 0.01 for i in range(len(DT_CHUNK_MULTI))], + "quality-code": [0] * len(DT_CHUNK_MULTI), + "ts_id": [TEST_TSID_MULTI2] * len(DT_CHUNK_MULTI), + "units": ["ft"] * len(DT_CHUNK_MULTI), + } +) + +DF_MULTI_TIMESERIES = pd.concat( + [DF_MULTI_TIMESERIES1, DF_MULTI_TIMESERIES2] +).reset_index(drop=True) + @pytest.fixture(scope="module", autouse=True) def setup_data(): @@ -54,6 +92,13 @@ def setup_data(): cwms.store_location(location) yield + for ts_id in TSIDS: + try: + cwms.delete_timeseries_identifier( + ts_id=ts_id, office_id=TEST_OFFICE, delete_method="DELETE_ALL" + ) + except Exception as e: + print(f"Failed to delete tsid {ts_id}: {e}") cwms.delete_location(TEST_LOCATION_ID, TEST_OFFICE, cascade_delete=True) @@ -115,43 +160,87 @@ def test_timeseries_df_to_json(): def test_store_multi_timeseries_df(): now = datetime.now(timezone.utc).replace(microsecond=0) - ts_id_rev_test = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") + TS_ID_REV_TEST = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") df = pd.DataFrame( { "date-time": [now, now], "value": [7, 8], "quality-code": [0, 0], - "ts_id": [TEST_TSID_MULTI, ts_id_rev_test], + "ts_id": [TEST_TSID_MULTI, TS_ID_REV_TEST], "units": ["ft", "ft"], } ) ts.store_multi_timeseries_df(df, TEST_OFFICE) - data1 = ts.get_timeseries(TEST_TSID_MULTI, TEST_OFFICE).json - data2 = ts.get_timeseries(ts_id_rev_test, TEST_OFFICE).json + data1 = ts.get_timeseries(TEST_TSID_MULTI, TEST_OFFICE, multithread=False).json + data2 = ts.get_timeseries(TS_ID_REV_TEST, TEST_OFFICE, multithread=False).json assert data1["name"] == TEST_TSID_MULTI assert data1["office-id"] == TEST_OFFICE assert data1["units"] == "ft" assert data1["values"][0][1] == pytest.approx(7) - assert data2["name"] == ts_id_rev_test + assert data2["name"] == TS_ID_REV_TEST assert data2["office-id"] == TEST_OFFICE assert data2["units"] == "ft" assert data2["values"][0][1] == pytest.approx(8) +def test_store_multi_timeseries_chunks_df(): + # test getting multi timeseries while using the chunk method as well + ts.store_multi_timeseries_df(data=DF_MULTI_TIMESERIES, office_id=TEST_OFFICE) + data1 = ts.get_timeseries( + ts_id=TEST_TSID_MULTI1, + office_id=TEST_OFFICE, + begin=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + multithread=False, + ).df + data2 = ts.get_timeseries( + ts_id=TEST_TSID_MULTI2, + office_id=TEST_OFFICE, + begin=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + multithread=False, + ).df + df_multi1 = DF_MULTI_TIMESERIES1[["date-time", "value", "quality-code"]] + df_multi2 = DF_MULTI_TIMESERIES2[["date-time", "value", "quality-code"]] + pdt.assert_frame_equal( + data1, df_multi1 + ), f"Data frames do not match: original = {df_multi1.describe()}, stored = {data1.describe()}" + + pdt.assert_frame_equal( + data2, df_multi2 + ), f"Data frames do not match: original = {df_multi2.describe()}, stored = {data2.describe()}" + + +def test_get_multi_timeseries_chunk_df(): + df = ts.get_multi_timeseries_df( + ts_ids=[TEST_TSID_MULTI1, TEST_TSID_MULTI2], + office_id=TEST_OFFICE, + begin=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + melted=True, + ) + assert df is not None, "Returned DataFrame is None" + assert not df.empty, "Returned DataFrame is empty" + + pdt.assert_frame_equal( + df, DF_MULTI_TIMESERIES + ), f"Data frames do not match: original = {DF_MULTI_TIMESERIES.describe()}, stored = {df.describe()}" + + def test_get_multi_timeseries_df(): - ts_id_rev_test = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") - df = ts.get_multi_timeseries_df([TEST_TSID_MULTI, ts_id_rev_test], TEST_OFFICE) + TS_ID_REV_TEST = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") + df = ts.get_multi_timeseries_df([TEST_TSID_MULTI, TS_ID_REV_TEST], TEST_OFFICE) assert df is not None, "Returned DataFrame is None" assert not df.empty, "Returned DataFrame is empty" assert any( TEST_TSID_MULTI in str(col) for col in df.columns ), f"{TEST_TSID_MULTI} not found in DataFrame columns" assert any( - ts_id_rev_test in str(col) for col in df.columns - ), f"{ts_id_rev_test} not found in DataFrame columns" + TS_ID_REV_TEST in str(col) for col in df.columns + ), f"{TS_ID_REV_TEST} not found in DataFrame columns" -def test_store_timeseries_multi_chunk_ts(): +def test_store_timeseries_chunk_ts(): # Define parameters ts_id = TEST_TSID_CHUNK_MULTI office = TEST_OFFICE @@ -160,59 +249,40 @@ def test_store_timeseries_multi_chunk_ts(): # Convert DataFrame to JSON format ts_json = ts.timeseries_df_to_json(DF_CHUNK_MULTI, ts_id, units, office) - # Capture the log output - with patch("builtins.print") as mock_print: - ts.store_timeseries(ts_json, multithread=True, chunk_size=2 * 7 * 24 * 4) - - # Extract the log messages - log_messages = [call.args[0] for call in mock_print.call_args_list] - - # Find the relevant log message - store_log = next((msg for msg in log_messages if "INFO: Storing" in msg), None) - assert store_log is not None, "Expected log message not found" - - # Parse the number of chunks and threads - chunks = int(store_log.split("chunks")[0].split()[-1]) - threads = int(store_log.split("with")[1].split()[0]) - - # Assert the expected values - assert chunks == 5, f"Expected 5 chunks, but got {chunks}" - assert threads == 5, f"Expected 5 threads, but got {threads}" + ts.store_timeseries(ts_json, multithread=True, chunk_size=2 * 7 * 24 * 4) + data_multithread = ts.get_timeseries( + ts_id=TEST_TSID_CHUNK_MULTI, + office_id=TEST_OFFICE, + begin=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + max_days_per_chunk=14, + unit="SI", + ) + df = data_multithread.df + # make sure the dataframe matches stored dataframe + pdt.assert_frame_equal( + df, DF_CHUNK_MULTI + ), f"Data frames do not match: original = {DF_CHUNK_MULTI.describe()}, stored = {df.describe()}" -def test_read_timeseries_multi_chunk_ts(): +def test_read_timeseries_chunk_ts(): # Capture the log output - with patch("builtins.print") as mock_print: - data_multithread = ts.get_timeseries( - ts_id=TEST_TSID_CHUNK_MULTI, - office_id=TEST_OFFICE, - begin=START_DATE_CHUNK_MULTI, - end=END_DATE_CHUNK_MULTI, - max_days_per_chunk=14, - unit="SI", - ) - - # Extract the log messages - log_messages = [call.args[0] for call in mock_print.call_args_list] - - # Find the relevant log message - read_log = next((msg for msg in log_messages if "INFO: Fetching" in msg), None) - assert read_log is not None, "Expected log message not found" - - # Parse the number of chunks and threads - chunks = int(read_log.split("chunks")[0].split()[-1]) - threads = int(read_log.split("with")[1].split()[0]) - - # Assert the expected values - assert chunks == 5, f"Expected 5 chunks, but got {chunks}" - assert threads == 5, f"Expected 5 threads, but got {threads}" + data_multithread = ts.get_timeseries( + ts_id=TEST_TSID_CHUNK_MULTI, + office_id=TEST_OFFICE, + begin=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + max_days_per_chunk=14, + unit="SI", + ) # Check metadata for multithreaded read data_json = data_multithread.json - # check df values df = data_multithread.df.copy() + assert df is not None, "Returned DataFrame is None" + assert not df.empty, "Returned DataFrame is empty" # make sure the dataframe matches stored dataframe pdt.assert_frame_equal( @@ -226,10 +296,10 @@ def test_read_timeseries_multi_chunk_ts(): def test_delete_timeseries(): - ts_id_rev_test = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") + TS_ID_REV_TEST = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") now = datetime.now(timezone.utc).replace(microsecond=0) begin = now - timedelta(minutes=15) end = now + timedelta(minutes=15) - ts.delete_timeseries(ts_id_rev_test, TEST_OFFICE, begin, end) - result = ts.get_timeseries(ts_id_rev_test, TEST_OFFICE) + ts.delete_timeseries(TS_ID_REV_TEST, TEST_OFFICE, begin, end) + result = ts.get_timeseries(TS_ID_REV_TEST, TEST_OFFICE) assert result is None or result.json.get("values", []) == []