Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cwms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from cwms.ratings.ratings_spec import *
from cwms.ratings.ratings_template import *
from cwms.standard_text.standard_text import *
from cwms.timeseries.critscript import *
from cwms.timeseries.timerseries_identifier import *
from cwms.timeseries.timeseries import *
from cwms.timeseries.timeseries_bin import *
Expand Down
127 changes: 127 additions & 0 deletions cwms/timeseries/critscript.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import re
from typing import Dict, List

import pandas as pd

import cwms


def crit_script(
file_path: str,
office_id: str,
group_id: str = "SHEF Data Acquisition",
category_id: str = "Data Aquisition",
group_office_id: str = "CWMS",
) -> None:
"""
Processes a .crit file, updates the timeseries groups, and generates a JSON dictionary.

Parameters
----------
file_path : str
Path to the .crit file.
office_id : str
The ID of the office associated with the specified timeseries.
group_id : str, optional
The specified group associated with the timeseries data. Defaults to "SHEF Data Acquisition".
category_id : str, optional
The category ID that contains the timeseries group. Defaults to "Data Acquisition".
group_office_id : str, optional
The specified office group associated with the timeseries data. Defaults to "CWMS".

Returns
-------
None
"""

def parse_crit_file(file_path: str) -> List[Dict[str, str]]:
"""
Parses a .crit file into a dictionary containing timeseries ID and Alias.

Parameters
----------
file_path : str
Path to the .crit file.

Returns
-------
List[Dict[str, str]]
A list of dictionaries with "Alias" and "Timeseries ID" as keys.
"""
parsed_data = []
with open(file_path, "r") as file:
for line in file:
# Ignore comment lines and empty lines
if line.startswith("#") or not line.strip():
continue

# Extract alias, timeseries ID, and TZ
match = re.match(r"([^=]+)=([^;]+);(.+)", line.strip())
if match:
alias = match.group(1).strip()
timeseries_id = match.group(2).strip()
alias2 = match.group(3).strip()

parsed_data.append(
{
"Alias": alias + ":" + alias2,
"Timeseries ID": timeseries_id,
}
)

return parsed_data

def append_df(
df: pd.DataFrame, office_id: str, tsId: str, alias: str
) -> pd.DataFrame:
"""
Appends a row to the DataFrame.

Parameters
----------
df : pandas.DataFrame
The DataFrame to append to.
office_id : str
The ID of the office associated with the specified timeseries.
tsId : str
The timeseries ID from the file.
alias : str
The alias from the file.
Returns
-------
pandas.DataFrame
The updated DataFrame.
"""
data = {
"officeId": [office_id],
"timeseriesId": [tsId],
"aliasId": [alias],
"tsCode": ["none"], # Default value for ts-code
"attribute": [0], # Default value for attribute
}
df = pd.concat([df, pd.DataFrame(data)])
return df

# Parse the file and get the parsed data
parsed_data = parse_crit_file(file_path)

df = pd.DataFrame()

for data in parsed_data:
# Create DataFrame for the current row
df = append_df(df, office_id, data["Timeseries ID"], data["Alias"])

# Generate JSON dictionary
json_dict = cwms.timeseries_group_df_to_json(
df, group_id, group_office_id, category_id
)

# Print DataFrame for verification
pd.set_option("display.max_columns", None)

cwms.update_timeseries_groups(
group_id=group_id,
office_id=office_id,
replace_assigned_ts=None,
data=json_dict,
)
114 changes: 114 additions & 0 deletions cwms/timeseries/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,120 @@
from cwms.cwms_types import JSON, Data


def update_timeseries_groups(
group_id: str,
office_id: str,
replace_assigned_ts: Optional[bool],
data: JSON,
) -> None:
"""
Updates the timeseries groups with the provided group ID and office ID.

Parameters
----------
group_id : str
The new specified timeseries ID that will replace the old ID.
office_id : str
The ID of the office associated with the specified timeseries.
replace_assigned_ts : bool, optional
Specifies whether to unassign all existing time series before assigning new time series specified in the content body. Default is False.
data: JSON dictionary
Time Series data to be stored.
````````````````````````````````````````
Returns
-------
None
"""
if not group_id:
raise ValueError("Cannot update a specified level without an id")
if not office_id:
raise ValueError("Cannot update a specified level without an office id")

endpoint = f"timeseries/group/{group_id}"
params = {
"replace-assigned-ts": replace_assigned_ts,
"office": office_id,
}

api.patch(endpoint=endpoint, data=data, params=params)


def timeseries_group_df_to_json(
data: pd.DataFrame,
group_id: str,
office_id: str,
category_id: str,
) -> JSON:
"""
Converts a dataframe to a json dictionary in the correct format.

Parameters
----------
data: pd.DataFrame
Dataframe containing timeseries information.
group_id: str
The group ID for the timeseries.
office_id: str
The ID of the office associated with the specified timeseries.
category_id: str
The ID of the category associated with the group

Returns
-------
JSON
JSON dictionary of the timeseries data.
"""
required_columns = ["officeId", "timeseriesId"]
optional_columns = ["aliasId", "attribute", "tsCode"]
for column in required_columns:
if column not in data.columns:
raise TypeError(
f"{column} is a required column in data when posting as a dataframe"
)

if data[required_columns].isnull().any().any():
raise ValueError(
f"Null/NaN values found in required columns: {required_columns}. "
)

# Fill optional columns with default values if missing
if "aliasId" not in data.columns:
data["aliasId"] = None
if "attribute" not in data.columns:
data["attribute"] = 0

# Replace NaN with None for optional columns
for column in optional_columns:
if column in data.columns:
data[column] = data[column].where(pd.notnull(data[column]), None)

# Build the list of time-series entries
assigned_time_series = data.apply(
lambda entry: {
"office-id": entry["officeId"],
"timeseries-id": entry["timeseriesId"],
"alias-id": entry["aliasId"],
"attribute": entry["attribute"],
**(
{"tsCode": entry["tsCode"]}
if "tsCode" in entry and pd.notna(entry["tsCode"])
else {}
),
},
axis=1,
).tolist()

# Construct the final JSON dictionary
json_dict = {
"office-id": office_id,
"id": group_id,
"time-series-category": {"office-id": office_id, "id": category_id},
"assigned-time-series": assigned_time_series,
}

return json_dict


def get_timeseries_group(group_id: str, category_id: str, office_id: str) -> Data:
"""Retreives time series stored in the requested time series group

Expand Down
66 changes: 66 additions & 0 deletions tests/timeseries/timeseries_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,72 @@ def init_session():
cwms.api.init_session(api_root=_MOCK_ROOT)


def test_update_timeseries_groups(requests_mock):
group_id = "USGS TS Data Acquisition"
office_id = "CWMS"
replace_assigned_ts = True
data = _TS_GROUP

requests_mock.patch(
f"{_MOCK_ROOT}/timeseries/group/USGS%20TS%20Data%20Acquisition?replace-assigned-ts=True&office=CWMS",
status_code=200,
)

timeseries.update_timeseries_groups(group_id, office_id, replace_assigned_ts, data)

assert requests_mock.called
assert requests_mock.call_count == 1


def test_timeseries_group_df_to_json_valid_data():
data = pd.DataFrame(
{
"officeId": ["office123", "office456"],
"timeseriesId": ["ts1", "ts2"],
"aliasId": [None, "alias2"],
"attribute": [0, 10],
"tsCode": ["code1", None],
}
)

# Clean DataFrame by removing NaN from required columns and fix optional ones
required_columns = ["officeId", "timeseriesId"]
data = data.dropna(subset=required_columns)
optional_columns = ["aliasId", "tsCode"]
for col in optional_columns:
if col in data.columns:
data[col] = data[col].where(pd.notnull(data[col]), None)

expected_json = {
"office-id": "office123",
"id": "group123",
"time-series-category": {
"office-id": "office123",
"id": "cat123",
},
"assigned-time-series": [
{
"office-id": "office123",
"timeseries-id": "ts1",
"alias-id": None,
"attribute": 0,
"tsCode": "code1",
},
{
"office-id": "office456",
"timeseries-id": "ts2",
"alias-id": "alias2",
"attribute": 10,
},
],
}

result = timeseries.timeseries_group_df_to_json(
data, "group123", "office123", "cat123"
)
assert result == expected_json


def test_timeseries_df_to_json():
test_json = {
"name": "TestLoc.Stage.Inst.1Hour.0.Testing",
Expand Down
Loading