Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cwms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from cwms.api import *
from cwms.catalog.catalog import *
from cwms.datafile_imports.shef_critfile_import import *
from cwms.forecast.forecast_instance import *
from cwms.forecast.forecast_spec import *
from cwms.levels.location_levels import *
Expand Down
126 changes: 126 additions & 0 deletions cwms/datafile_imports/shef_critfile_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import re
from typing import Dict, List

import pandas as pd

from cwms.timeseries.timeseries import (
timeseries_group_df_to_json,
update_timeseries_groups,
)


def import_critfile_to_ts_group(
file_path: str,
office_id: str,
group_id: str = "SHEF Data Acquisition",
category_id: str = "Data Acquisition",
group_office_id: str = "CWMS",
replace_assigned_ts: bool = False,
) -> None:
"""
Processes a .crit file and saves the information to the SHEF Data Acquisition time series group.

Parameters
----------
file_path : str
Path to the .crit file.
office_id : str
The ID of the office associated with the specified timeseries.
group_id : str, optional
The specified group associated with the timeseries data. Defaults to "SHEF Data Acquisition".
category_id : str, optional
The category ID that contains the timeseries group. Defaults to "Data Acquisition".
group_office_id : str, optional
The specified office group associated with the timeseries data. Defaults to "CWMS".
replace_assigned_ts : bool, optional
Specifies whether to unassign all existing time series before assigning new time series specified in the content body. Default is False.

Returns
-------
None
"""

def parse_crit_file(file_path: str) -> List[Dict[str, str]]:
"""
Parses a .crit file into a dictionary containing timeseries ID and Alias.

Parameters
----------
file_path : str
Path to the .crit file.

Returns
-------
List[Dict[str, str]]
A list of dictionaries with "Alias" and "Timeseries ID" as keys.
"""
parsed_data = []
with open(file_path, "r") as file:
for line in file:
# Ignore comment lines and empty lines
if line.startswith("#") or not line.strip():
continue

# Extract alias, timeseries ID, and TZ
match = re.match(r"([^=]+)=([^;]+);(.+)", line.strip())

if match:
alias = match.group(1).strip()
timeseries_id = match.group(2).strip()
alias2 = match.group(3).strip()

parsed_data.append(
{
"Alias": alias + ":" + alias2,
"Timeseries ID": timeseries_id,
}
)

return parsed_data

def append_df(
df: pd.DataFrame, office_id: str, ts_id: str, alias: str
) -> pd.DataFrame:
"""
Appends a row to the DataFrame.

Parameters
----------
df : pandas.DataFrame
The DataFrame to append to.
office_id : str
The ID of the office associated with the specified timeseries.
tsId : str
The timeseries ID from the file.
alias : str
The alias from the file.
Returns
-------
pandas.DataFrame
The updated DataFrame.
"""
data = {
"office-id": [office_id],
"timeseries-id": [ts_id],
"alias-id": [alias],
}
df = pd.concat([df, pd.DataFrame(data)])
return df

# Parse the file and get the parsed data
parsed_data = parse_crit_file(file_path)

df = pd.DataFrame()
for data in parsed_data:
# Create DataFrame for the current row
df = append_df(df, office_id, data["Timeseries ID"], data["Alias"])

# Generate JSON dictionary
json_dict = timeseries_group_df_to_json(df, group_id, group_office_id, category_id)

update_timeseries_groups(
group_id=group_id,
office_id=office_id,
replace_assigned_ts=replace_assigned_ts,
data=json_dict,
)
115 changes: 115 additions & 0 deletions cwms/timeseries/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,121 @@
from cwms.cwms_types import JSON, Data


def update_timeseries_groups(
data: JSON,
group_id: str,
office_id: str,
replace_assigned_ts: Optional[bool] = False,
) -> None:
"""
Updates the timeseries groups with the provided group ID and office ID.

Parameters
----------
group_id : str
The new specified timeseries ID that will replace the old ID.
office_id : str
The ID of the office associated with the specified timeseries.
replace_assigned_ts : bool, optional
Specifies whether to unassign all existing time series before assigning new time series specified in the content body. Default is False.
data: JSON dictionary
Time Series data to be stored.
````````````````````````````````````````
Returns
-------
None
"""
if not group_id:
raise ValueError("Cannot update a specified level without an id")
if not office_id:
raise ValueError("Cannot update a specified level without an office id")

endpoint = f"timeseries/group/{group_id}"
params = {
"replace-assigned-ts": replace_assigned_ts,
"office": office_id,
}

api.patch(endpoint=endpoint, data=data, params=params, api_version=1)


def timeseries_group_df_to_json(
data: pd.DataFrame,
group_id: str,
office_id: str,
category_id: str,
) -> JSON:
"""
Converts a dataframe to a json dictionary in the correct format.

Parameters
----------
data: pd.DataFrame
Dataframe containing timeseries information.
group_id: str
The group ID for the timeseries.
office_id: str
The ID of the office associated with the specified timeseries.
category_id: str
The ID of the category associated with the group

Returns
-------
JSON
JSON dictionary of the timeseries data.
"""
df = data.copy()
required_columns = ["office-id", "timeseries-id"]
optional_columns = ["alias-id", "attribute", "ts-code"]
for column in required_columns:
if column not in df.columns:
raise TypeError(
f"{column} is a required column in data when posting as a dataframe"
)

if df[required_columns].isnull().any().any():
raise ValueError(
f"Null/NaN values found in required columns: {required_columns}. "
)

# Fill optional columns with default values if missing
if "alias-id" not in df.columns:
df["alias-id"] = None
if "attribute" not in df.columns:
df["attribute"] = 0

# Replace NaN with None for optional columns
for column in optional_columns:
if column in df.columns:
data[column] = df[column].where(pd.notnull(df[column]), None)

# Build the list of time-series entries
assigned_time_series = df.apply(
lambda entry: {
"office-id": entry["office-id"],
"timeseries-id": entry["timeseries-id"],
"alias-id": entry["alias-id"],
"attribute": entry["attribute"],
**(
{"ts-code": entry["ts-code"]}
if "ts-code" in entry and pd.notna(entry["ts-code"])
else {}
),
},
axis=1,
).tolist()

# Construct the final JSON dictionary
json_dict = {
"office-id": office_id,
"id": group_id,
"time-series-category": {"office-id": office_id, "id": category_id},
"assigned-time-series": assigned_time_series,
}

return json_dict


def get_timeseries_group(group_id: str, category_id: str, office_id: str) -> Data:
"""Retreives time series stored in the requested time series group

Expand Down
71 changes: 71 additions & 0 deletions tests/timeseries/timeseries_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,77 @@ def init_session():
cwms.api.init_session(api_root=_MOCK_ROOT)


def test_update_timeseries_groups(requests_mock):
group_id = "USGS TS Data Acquisition"
office_id = "CWMS"
replace_assigned_ts = True
data = _TS_GROUP

requests_mock.patch(
f"{_MOCK_ROOT}/timeseries/group/USGS%20TS%20Data%20Acquisition?replace-assigned-ts=True&office=CWMS",
status_code=200,
)

timeseries.update_timeseries_groups(
data=data,
group_id=group_id,
office_id=office_id,
replace_assigned_ts=replace_assigned_ts,
)

assert requests_mock.called
assert requests_mock.call_count == 1


def test_timeseries_group_df_to_json_valid_data():
data = pd.DataFrame(
{
"office-id": ["office123", "office456"],
"timeseries-id": ["ts1", "ts2"],
"alias-id": [None, "alias2"],
"attribute": [0, 10],
"ts-code": ["code1", None],
}
)

# Clean DataFrame by removing NaN from required columns and fix optional ones
required_columns = ["office-id", "timeseries-id"]
data = data.dropna(subset=required_columns)
optional_columns = ["alias-id", "ts-code"]
for col in optional_columns:
if col in data.columns:
data[col] = data[col].where(pd.notnull(data[col]), None)

expected_json = {
"office-id": "office123",
"id": "group123",
"time-series-category": {
"office-id": "office123",
"id": "cat123",
},
"assigned-time-series": [
{
"office-id": "office123",
"timeseries-id": "ts1",
"alias-id": None,
"attribute": 0,
"ts-code": "code1",
},
{
"office-id": "office456",
"timeseries-id": "ts2",
"alias-id": "alias2",
"attribute": 10,
},
],
}

result = timeseries.timeseries_group_df_to_json(
data, "group123", "office123", "cat123"
)
assert result == expected_json


def test_timeseries_df_to_json():
test_json = {
"name": "TestLoc.Stage.Inst.1Hour.0.Testing",
Expand Down
Loading