diff --git a/cwms/__init__.py b/cwms/__init__.py index 12870b40..e2d9f159 100644 --- a/cwms/__init__.py +++ b/cwms/__init__.py @@ -2,6 +2,7 @@ from cwms.api import * from cwms.catalog.catalog import * +from cwms.datafile_imports.shef_critfile_import import * from cwms.forecast.forecast_instance import * from cwms.forecast.forecast_spec import * from cwms.levels.location_levels import * diff --git a/cwms/datafile_imports/shef_critfile_import.py b/cwms/datafile_imports/shef_critfile_import.py new file mode 100644 index 00000000..b0b5593c --- /dev/null +++ b/cwms/datafile_imports/shef_critfile_import.py @@ -0,0 +1,126 @@ +import re +from typing import Dict, List + +import pandas as pd + +from cwms.timeseries.timeseries import ( + timeseries_group_df_to_json, + update_timeseries_groups, +) + + +def import_critfile_to_ts_group( + file_path: str, + office_id: str, + group_id: str = "SHEF Data Acquisition", + category_id: str = "Data Acquisition", + group_office_id: str = "CWMS", + replace_assigned_ts: bool = False, +) -> None: + """ + Processes a .crit file and saves the information to the SHEF Data Acquisition time series group. + + Parameters + ---------- + file_path : str + Path to the .crit file. + office_id : str + The ID of the office associated with the specified timeseries. + group_id : str, optional + The specified group associated with the timeseries data. Defaults to "SHEF Data Acquisition". + category_id : str, optional + The category ID that contains the timeseries group. Defaults to "Data Acquisition". + group_office_id : str, optional + The specified office group associated with the timeseries data. Defaults to "CWMS". + replace_assigned_ts : bool, optional + Specifies whether to unassign all existing time series before assigning new time series specified in the content body. Default is False. + + Returns + ------- + None + """ + + def parse_crit_file(file_path: str) -> List[Dict[str, str]]: + """ + Parses a .crit file into a dictionary containing timeseries ID and Alias. + + Parameters + ---------- + file_path : str + Path to the .crit file. + + Returns + ------- + List[Dict[str, str]] + A list of dictionaries with "Alias" and "Timeseries ID" as keys. + """ + parsed_data = [] + with open(file_path, "r") as file: + for line in file: + # Ignore comment lines and empty lines + if line.startswith("#") or not line.strip(): + continue + + # Extract alias, timeseries ID, and TZ + match = re.match(r"([^=]+)=([^;]+);(.+)", line.strip()) + + if match: + alias = match.group(1).strip() + timeseries_id = match.group(2).strip() + alias2 = match.group(3).strip() + + parsed_data.append( + { + "Alias": alias + ":" + alias2, + "Timeseries ID": timeseries_id, + } + ) + + return parsed_data + + def append_df( + df: pd.DataFrame, office_id: str, ts_id: str, alias: str + ) -> pd.DataFrame: + """ + Appends a row to the DataFrame. + + Parameters + ---------- + df : pandas.DataFrame + The DataFrame to append to. + office_id : str + The ID of the office associated with the specified timeseries. + tsId : str + The timeseries ID from the file. + alias : str + The alias from the file. + Returns + ------- + pandas.DataFrame + The updated DataFrame. + """ + data = { + "office-id": [office_id], + "timeseries-id": [ts_id], + "alias-id": [alias], + } + df = pd.concat([df, pd.DataFrame(data)]) + return df + + # Parse the file and get the parsed data + parsed_data = parse_crit_file(file_path) + + df = pd.DataFrame() + for data in parsed_data: + # Create DataFrame for the current row + df = append_df(df, office_id, data["Timeseries ID"], data["Alias"]) + + # Generate JSON dictionary + json_dict = timeseries_group_df_to_json(df, group_id, group_office_id, category_id) + + update_timeseries_groups( + group_id=group_id, + office_id=office_id, + replace_assigned_ts=replace_assigned_ts, + data=json_dict, + ) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index f58deb62..e82a21f9 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -9,6 +9,121 @@ from cwms.cwms_types import JSON, Data +def update_timeseries_groups( + data: JSON, + group_id: str, + office_id: str, + replace_assigned_ts: Optional[bool] = False, +) -> None: + """ + Updates the timeseries groups with the provided group ID and office ID. + + Parameters + ---------- + group_id : str + The new specified timeseries ID that will replace the old ID. + office_id : str + The ID of the office associated with the specified timeseries. + replace_assigned_ts : bool, optional + Specifies whether to unassign all existing time series before assigning new time series specified in the content body. Default is False. + data: JSON dictionary + Time Series data to be stored. + ```````````````````````````````````````` + Returns + ------- + None + """ + if not group_id: + raise ValueError("Cannot update a specified level without an id") + if not office_id: + raise ValueError("Cannot update a specified level without an office id") + + endpoint = f"timeseries/group/{group_id}" + params = { + "replace-assigned-ts": replace_assigned_ts, + "office": office_id, + } + + api.patch(endpoint=endpoint, data=data, params=params, api_version=1) + + +def timeseries_group_df_to_json( + data: pd.DataFrame, + group_id: str, + office_id: str, + category_id: str, +) -> JSON: + """ + Converts a dataframe to a json dictionary in the correct format. + + Parameters + ---------- + data: pd.DataFrame + Dataframe containing timeseries information. + group_id: str + The group ID for the timeseries. + office_id: str + The ID of the office associated with the specified timeseries. + category_id: str + The ID of the category associated with the group + + Returns + ------- + JSON + JSON dictionary of the timeseries data. + """ + df = data.copy() + required_columns = ["office-id", "timeseries-id"] + optional_columns = ["alias-id", "attribute", "ts-code"] + for column in required_columns: + if column not in df.columns: + raise TypeError( + f"{column} is a required column in data when posting as a dataframe" + ) + + if df[required_columns].isnull().any().any(): + raise ValueError( + f"Null/NaN values found in required columns: {required_columns}. " + ) + + # Fill optional columns with default values if missing + if "alias-id" not in df.columns: + df["alias-id"] = None + if "attribute" not in df.columns: + df["attribute"] = 0 + + # Replace NaN with None for optional columns + for column in optional_columns: + if column in df.columns: + data[column] = df[column].where(pd.notnull(df[column]), None) + + # Build the list of time-series entries + assigned_time_series = df.apply( + lambda entry: { + "office-id": entry["office-id"], + "timeseries-id": entry["timeseries-id"], + "alias-id": entry["alias-id"], + "attribute": entry["attribute"], + **( + {"ts-code": entry["ts-code"]} + if "ts-code" in entry and pd.notna(entry["ts-code"]) + else {} + ), + }, + axis=1, + ).tolist() + + # Construct the final JSON dictionary + json_dict = { + "office-id": office_id, + "id": group_id, + "time-series-category": {"office-id": office_id, "id": category_id}, + "assigned-time-series": assigned_time_series, + } + + return json_dict + + def get_timeseries_group(group_id: str, category_id: str, office_id: str) -> Data: """Retreives time series stored in the requested time series group diff --git a/tests/timeseries/timeseries_test.py b/tests/timeseries/timeseries_test.py index 6f05b033..2ca21f76 100644 --- a/tests/timeseries/timeseries_test.py +++ b/tests/timeseries/timeseries_test.py @@ -29,6 +29,77 @@ def init_session(): cwms.api.init_session(api_root=_MOCK_ROOT) +def test_update_timeseries_groups(requests_mock): + group_id = "USGS TS Data Acquisition" + office_id = "CWMS" + replace_assigned_ts = True + data = _TS_GROUP + + requests_mock.patch( + f"{_MOCK_ROOT}/timeseries/group/USGS%20TS%20Data%20Acquisition?replace-assigned-ts=True&office=CWMS", + status_code=200, + ) + + timeseries.update_timeseries_groups( + data=data, + group_id=group_id, + office_id=office_id, + replace_assigned_ts=replace_assigned_ts, + ) + + assert requests_mock.called + assert requests_mock.call_count == 1 + + +def test_timeseries_group_df_to_json_valid_data(): + data = pd.DataFrame( + { + "office-id": ["office123", "office456"], + "timeseries-id": ["ts1", "ts2"], + "alias-id": [None, "alias2"], + "attribute": [0, 10], + "ts-code": ["code1", None], + } + ) + + # Clean DataFrame by removing NaN from required columns and fix optional ones + required_columns = ["office-id", "timeseries-id"] + data = data.dropna(subset=required_columns) + optional_columns = ["alias-id", "ts-code"] + for col in optional_columns: + if col in data.columns: + data[col] = data[col].where(pd.notnull(data[col]), None) + + expected_json = { + "office-id": "office123", + "id": "group123", + "time-series-category": { + "office-id": "office123", + "id": "cat123", + }, + "assigned-time-series": [ + { + "office-id": "office123", + "timeseries-id": "ts1", + "alias-id": None, + "attribute": 0, + "ts-code": "code1", + }, + { + "office-id": "office456", + "timeseries-id": "ts2", + "alias-id": "alias2", + "attribute": 10, + }, + ], + } + + result = timeseries.timeseries_group_df_to_json( + data, "group123", "office123", "cat123" + ) + assert result == expected_json + + def test_timeseries_df_to_json(): test_json = { "name": "TestLoc.Stage.Inst.1Hour.0.Testing",