Skip to content
8 changes: 4 additions & 4 deletions .github/workflows/CDA-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: CI

on:
push:
branches: [ main, githubAction-testing ]
branches: [main, githubAction-testing]
pull_request:
branches: [ main ]
branches: [main]
workflow_dispatch:

jobs:
Expand All @@ -20,7 +20,7 @@ jobs:
- name: Set Up Python
uses: actions/setup-python@v6
with:
python-version: '3.x'
python-version: '3.9.X'

# Unlike the code-check workflow, this job requires the dev dependencies to be
# installed to make sure we have the necessary, tools, stub files, etc.
Expand Down Expand Up @@ -62,4 +62,4 @@ jobs:
with:
file: ./code-coverage-results.md
vars: |-
empty: empty
empty: empty
40 changes: 24 additions & 16 deletions cwms/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,25 @@ def get_with_paging(
return response


def _post_function(
endpoint: str,
data: Any,
params: Optional[RequestParams] = None,
*,
api_version: int = API_VERSION,
) -> Any:

# post requires different headers than get for
headers = {"accept": "*/*", "Content-Type": api_version_text(api_version)}
if isinstance(data, dict) or isinstance(data, list):
data = json.dumps(data)
with SESSION.post(endpoint, params=params, headers=headers, data=data) as response:
if not response.ok:
logging.error(f"CDA Error: response={response}")
raise ApiError(response)
return response


def post(
endpoint: str,
data: Any,
Expand All @@ -329,11 +348,7 @@ def post(
Raises:
ApiError: If an error response is return by the API.
"""

post_with_returned_data(
endpoint=endpoint, data=data, params=params, api_version=api_version
)
return None
_post_function(endpoint=endpoint, data=data, params=params, api_version=api_version)


def post_with_returned_data(
Expand Down Expand Up @@ -361,17 +376,10 @@ def post_with_returned_data(
ApiError: If an error response is return by the API.
"""

# post requires different headers than get for
headers = {"accept": "*/*", "Content-Type": api_version_text(api_version)}

if isinstance(data, dict) or isinstance(data, list):
data = json.dumps(data)

with SESSION.post(endpoint, params=params, headers=headers, data=data) as response:
if not response.ok:
logging.error(f"CDA Error: response={response}")
raise ApiError(response)
return _process_response(response)
response = _post_function(
endpoint=endpoint, data=data, params=params, api_version=api_version
)
return _process_response(response)


def patch(
Expand Down
4 changes: 3 additions & 1 deletion cwms/projects/water_supply/accounting.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# United States Army Corps of Engineers - Hydrologic Engineering Center (USACE/HEC)
# All Rights Reserved. USACE PROPRIETARY/CONFIDENTIAL.
# Source may not be released without written approval from HEC
from typing import Union

import cwms.api as api
from cwms.cwms_types import JSON, Data

Expand Down Expand Up @@ -74,7 +76,7 @@ def get_pump_accounting(

endpoint = f"projects/{office_id}/{project_id}/water-user/{water_user}/contracts/{contract_name}/accounting"

params: dict[str, str | int] = {
params: dict[str, Union[str, int]] = {
"start": start,
"end": end,
"timezone": timezone,
Expand Down
163 changes: 45 additions & 118 deletions cwms/timeseries/timeseries.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import concurrent.futures
import logging
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple

Expand Down Expand Up @@ -82,7 +83,7 @@ def get_ts_ids(ts_id: str) -> Any:
}
return result_dict
except Exception as e:
print(f"Error processing {ts_id}: {e}")
logging.error(f"Error processing {ts_id}: {e}")
return None

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
Expand Down Expand Up @@ -144,15 +145,20 @@ def chunk_timeseries_time_range(
return chunks


def get_timeseries_chunk(
selector: str, endpoint: str, param: Dict[str, Any], begin: datetime, end: datetime
) -> Data:
param["begin"] = begin.isoformat() if begin else None
param["end"] = end.isoformat() if end else None
response = api.get_with_paging(selector=selector, endpoint=endpoint, params=param)
return Data(response, selector=selector)


def fetch_timeseries_chunks(
chunks: List[Tuple[datetime, datetime]],
ts_id: str,
office_id: str,
unit: Optional[str],
datum: Optional[str],
page_size: Optional[int],
version_date: Optional[datetime],
trim: Optional[bool],
params: Dict[str, Any],
selector: str,
endpoint: str,
max_workers: int,
) -> List[Data]:
# Initialize an empty list to store results
Expand All @@ -164,15 +170,11 @@ def fetch_timeseries_chunks(
future_to_chunk = {
executor.submit(
get_timeseries_chunk,
ts_id,
office_id,
unit,
datum,
selector,
endpoint,
params.copy(),
chunk_start,
chunk_end,
page_size,
version_date,
trim,
): (chunk_start, chunk_end)
for chunk_start, chunk_end in chunks
}
Expand All @@ -184,52 +186,14 @@ def fetch_timeseries_chunks(
result = future.result()
results.append(result)
except Exception as e:
# Log or handle any errors that occur during execution
chunk_start, chunk_end = future_to_chunk[future]
print(
f"ERROR: Failed to fetch data from {chunk_start} to {chunk_end}: {e}"
# Log or handle any errors that occur during execution
logging.error(
f"Failed to fetch data from {chunk_start} to {chunk_end}: {e}"
)
return results


def get_timeseries_chunk(
ts_id: str,
office_id: str,
unit: Optional[str] = "EN",
datum: Optional[str] = None,
begin: Optional[datetime] = None,
end: Optional[datetime] = None,
page_size: Optional[int] = 300000,
version_date: Optional[datetime] = None,
trim: Optional[bool] = True,
) -> Data:

# creates the dataframe from the timeseries data
endpoint = "timeseries"
if begin and not isinstance(begin, datetime):
raise ValueError("begin needs to be in datetime")
if end and not isinstance(end, datetime):
raise ValueError("end needs to be in datetime")
if version_date and not isinstance(version_date, datetime):
raise ValueError("version_date needs to be in datetime")
params = {
"office": office_id,
"name": ts_id,
"unit": unit,
"datum": datum,
"begin": begin.isoformat() if begin else None,
"end": end.isoformat() if end else None,
"page-size": page_size,
"page": None,
"version-date": version_date.isoformat() if version_date else None,
"trim": trim,
}
selector = "values"

response = api.get_with_paging(selector=selector, endpoint=endpoint, params=params)
return Data(response, selector=selector)


def combine_timeseries_results(results: List[Data]) -> Data:
"""
Combines the results from multiple chunks into a single cwms Data object.
Expand Down Expand Up @@ -348,7 +312,7 @@ def get_timeseries(
"""

selector = "values"

endpoint = "timeseries"
params = {
"office": office_id,
"name": ts_id,
Expand All @@ -371,25 +335,25 @@ def get_timeseries(
# replace begin with begin extent if outside extents
if begin < begin_extent:
begin = begin_extent
print(
f"INFO: Requested begin was before any data in this timeseries. Reseting to {begin}"
logging.debug(
f"Requested begin was before any data in this timeseries. Reseting to {begin}"
)
except Exception as e:
# If getting extents fails, fall back to single-threaded mode
print(
f"WARNING: Could not retrieve time series extents ({e}). Falling back to single-threaded mode."
logging.debug(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would actually call this an error instead of debug because it means the timeseries extents is failing for some reason.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be an error, but the request still goes through. it just doesn't use the multithread. so I don't want an error message to appear if the request completes. If we killed the entire request then yes we would put error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

f"Could not retrieve time series extents ({e}). Falling back to single-threaded mode."
)

response = api.get_with_paging(
selector=selector, endpoint="timeseries", params=params
selector=selector, endpoint=endpoint, params=params
)
return Data(response, selector=selector)

# divide the time range into chunks
chunks = chunk_timeseries_time_range(begin, end, timedelta(days=max_days_per_chunk))

# find max worker thread
max_workers = min(len(chunks), max_workers)
max_workers = max(min(len(chunks), max_workers), 1)

# if not multithread
if max_workers == 1 or not multithread:
Expand All @@ -398,19 +362,15 @@ def get_timeseries(
)
return Data(response, selector=selector)
else:
print(
f"INFO: Fetching {len(chunks)} chunks of timeseries data with {max_workers} threads"
logging.debug(
f"Fetching {len(chunks)} chunks of timeseries data with {max_workers} threads"
)
# fetch the data
result_list = fetch_timeseries_chunks(
chunks,
ts_id,
office_id,
unit,
datum,
page_size,
version_date,
trim,
params,
selector,
endpoint,
max_workers,
)

Expand Down Expand Up @@ -491,7 +451,9 @@ def timeseries_df_to_json(


def store_multi_timeseries_df(
data: pd.DataFrame, office_id: str, max_workers: Optional[int] = 30
data: pd.DataFrame,
office_id: str,
max_workers: Optional[int] = 30,
) -> None:
def store_ts_ids(
data: pd.DataFrame,
Expand All @@ -509,7 +471,7 @@ def store_ts_ids(
office_id=office_id,
version_date=version_date,
)
store_timeseries(data=data_json)
store_timeseries(data=data_json, multithread=multithread)
except Exception as e:
print(f"Error processing {ts_id}: {e}")
return None
Expand Down Expand Up @@ -575,42 +537,6 @@ def chunk_timeseries_data(
return chunk_list


def store_timeseries_chunk(
data: JSON,
create_as_ltrs: Optional[bool] = False,
store_rule: Optional[str] = None,
override_protection: Optional[bool] = False,
) -> None:
"""
Stores a single chunk of time series data.

Parameters
----------
chunk : list
A subset of time series values to be stored.
create_as_ltrs : bool
Flag indicating if timeseries should be created as Local Regular Time Series.
store_rule : str
The business rule to use when merging the incoming with existing data.
override_protection : bool
A flag to ignore the protected data quality when storing data.

Returns
-------
response
API response for the chunk storage.
"""
endpoint = "timeseries"
params = {
"create-as-lrts": create_as_ltrs,
"store-rule": store_rule,
"override-protection": override_protection,
}

# Make the API call
return api.post(endpoint, data, params)


def store_timeseries(
data: JSON,
create_as_ltrs: Optional[bool] = False,
Expand Down Expand Up @@ -667,8 +593,8 @@ def store_timeseries(
return api.post(endpoint, data, params)

actual_workers = min(max_workers, len(chunks))
print(
f"INFO: Storing {len(chunks)} chunks of timeseries data with {actual_workers} threads"
logging.debug(
f"Storing {len(chunks)} chunks of timeseries data with {actual_workers} threads"
)

# Store chunks concurrently
Expand All @@ -679,11 +605,10 @@ def store_timeseries(
# Submit each chunk as a separate task to the executor
for chunk in chunks:
future = executor.submit(
store_timeseries_chunk, # The function to execute
chunk, # The chunk of data to store
create_as_ltrs, # Whether to create as LRTS
store_rule, # The store rule to use
override_protection, # Whether to override protection
api.post, # The function to execute
endpoint, # The chunk of data to store
data,
params,
)
futures.append(future) # Add the future to the list

Expand All @@ -693,7 +618,9 @@ def store_timeseries(
except Exception as e:
start_time = chunk["values"][0][0]
end_time = chunk["values"][-1][0]
print(f"Error storing chunk from {start_time} to {end_time}: {e}")
logging.error(
f"Error storing chunk from {start_time} to {end_time}: {e}"
)
responses.append({"error": str(e)})

return
Expand Down
Loading