Skip to content
36 changes: 35 additions & 1 deletion cwms/catalog/catalog.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Optional
from datetime import datetime
from typing import Optional, Tuple

import pandas as pd

import cwms.api as api
from cwms.cwms_types import Data
Expand Down Expand Up @@ -130,3 +133,34 @@ def get_timeseries_catalog(

response = api.get(endpoint=endpoint, params=params, api_version=2)
return Data(response, selector="entries")


def get_ts_extents(ts_id: str, office_id: str) -> Tuple[datetime, datetime, datetime]:
"""Retrieves earliest extent, latest extent, and last update via cwms.get_timeseries_catalog

Parameters
----------
ts_id: string
Timseries id to query.
office_id: string
The owning office of the timeseries group.

Returns
-------
tuple of datetime objects (earliest_time, latest_time, last_update)
"""
cwms_cat = get_timeseries_catalog(
office_id=office_id,
like=ts_id,
timeseries_group_like=None,
page_size=500,
include_extents=True,
).df

times = cwms_cat[cwms_cat.name == ts_id].extents.values[0][0]

earliest_time = pd.to_datetime(times["earliest-time"])
latest_time = pd.to_datetime(times["latest-time"])
last_update = pd.to_datetime(times["last-update"])

return earliest_time, latest_time, last_update
270 changes: 242 additions & 28 deletions cwms/timeseries/timeseries.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import concurrent.futures
from datetime import datetime
from typing import Any, Dict, Optional
import math
import time
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple

import pandas as pd
from pandas import DataFrame

import cwms.api as api
from cwms.catalog.catalog import get_ts_extents
from cwms.cwms_types import JSON, Data


Expand Down Expand Up @@ -71,6 +74,7 @@ def get_ts_ids(ts_id: str) -> Any:
begin=begin,
end=end,
version_date=version_date_dt,
multithread=False,
)
result_dict = {
"ts_id": ts_id,
Expand Down Expand Up @@ -123,6 +127,9 @@ def get_timeseries(
page_size: Optional[int] = 300000,
version_date: Optional[datetime] = None,
trim: Optional[bool] = True,
multithread: Optional[bool] = True,
max_threads: int = 30,
max_days_per_chunk: int = 14,
) -> Data:
"""Retrieves time series values from a specified time series and time window. Value date-times
obtained are always in UTC.
Expand Down Expand Up @@ -161,35 +168,149 @@ def get_timeseries(
the timeseries is versioned, the query will return the max aggregate for the time period.
trim: boolean, optional, default is True
Specifies whether to trim missing values from the beginning and end of the retrieved values.
multithread: boolean, optional, default is True
Specifies whether to trim missing values from the beginning and end of the retrieved values.
max_threads: integer, default is 30
The maximum number of threads that will be spawned for multithreading
max_days_per_chunk: integer, default is 30
The maximum number of days that would be included in a thread
Returns
-------
cwms data type. data.json will return the JSON output and data.df will return a dataframe. dates are all in UTC
"""

# creates the dataframe from the timeseries data
def _ensure_utc_datetime(dt: Optional[datetime]) -> Optional[datetime]:
"""Convert datetime to UTC, preserving None values."""
if dt is None:
return None
if not isinstance(dt, datetime):
raise ValueError(f"Expected datetime object, got {type(dt)}")
return (
dt.replace(tzinfo=timezone.utc)
if dt.tzinfo is None
else dt.astimezone(timezone.utc)
)

def _call_api_for_range(begin: datetime, end: datetime) -> Dict[str, Any]:
params = {
"office": office_id,
"name": ts_id,
"unit": unit,
"datum": datum,
"begin": begin.isoformat() if begin else None,
"end": end.isoformat() if end else None,
"page-size": page_size,
"page": None,
"version-date": version_date.isoformat() if version_date else None,
"trim": trim,
}
response = api.get_with_paging(
selector=selector, endpoint=endpoint, params=params
)
return dict(response)

endpoint = "timeseries"
if begin and not isinstance(begin, datetime):
raise ValueError("begin needs to be in datetime")
if end and not isinstance(end, datetime):
raise ValueError("end needs to be in datetime")
if version_date and not isinstance(version_date, datetime):
raise ValueError("version_date needs to be in datetime")
params = {
"office": office_id,
"name": ts_id,
"unit": unit,
"datum": datum,
"begin": begin.isoformat() if begin else None,
"end": end.isoformat() if end else None,
"page-size": page_size,
"page": None,
"version-date": version_date.isoformat() if version_date else None,
"trim": trim,
}
selector = "values"

response = api.get_with_paging(selector=selector, endpoint=endpoint, params=params)
return Data(response, selector=selector)
# validate begin and end times or default to 1 day lookback
end = _ensure_utc_datetime(end) or datetime.now(timezone.utc)
begin = _ensure_utc_datetime(begin) or (end - timedelta(days=1))
version_date = _ensure_utc_datetime(version_date)

# Try to get extents for multithreading, fall back to single-threaded if it fails
if multithread:
try:
begin_extent, end_extent, last_update = get_ts_extents(
ts_id=ts_id, office_id=office_id
)
if begin.replace(tzinfo=timezone.utc) < begin_extent:
begin = begin_extent
except Exception as e:
# If getting extents fails, fall back to single-threaded mode
print(
f"WARNING: Could not retrieve time series extents ({e}). Falling back to single-threaded mode."
)
response = _call_api_for_range(begin, end)
return Data(response, selector=selector)

total_days = (end - begin).total_seconds() / (24 * 3600)

# split into N chunks where each chunk <= max_days_per_chunk, but cap chunks to max_threads
required_chunks = math.ceil(total_days / max_days_per_chunk)

# Limit the number of threads to max_threads
actual_threads = min(required_chunks, max_threads)

# if multithread is off or if you can get the data in just one chunk, use single thread
if required_chunks == 1 or not multithread:
response = _call_api_for_range(begin, end)
return Data(response, selector=selector)

print(
f"INFO: Getting data with {actual_threads} threads. Downloading {required_chunks} required chunks."
)

# Create time ranges based on max_days_per_chunk (not number of threads)
ranges: List[Tuple[datetime, datetime]] = []
current_begin = begin

for i in range(required_chunks):
current_end = min(current_begin + timedelta(days=max_days_per_chunk), end)
ranges.append((current_begin, current_end))
current_begin = current_end

if current_begin >= end:
break

# perform parallel requests
responses: List[Optional[Dict[str, Any]]] = [None] * len(ranges)
with concurrent.futures.ThreadPoolExecutor(max_workers=actual_threads) as executor:
future_to_idx = {
executor.submit(_call_api_for_range, r[0], r[1]): idx
for idx, r in enumerate(ranges)
}
for fut in concurrent.futures.as_completed(future_to_idx):
idx = future_to_idx[fut]
try:
responses[idx] = fut.result()
except Exception as exc:
# fail fast: re-raise so caller sees error (or change to continue/partial)
raise

# responses are now in the same order as ranges
sorted_responses = [resp for resp in responses if resp is not None]

# Merge JSON "values" lists (assumes top-level "values" list present)
merged_json: Dict[str, Any] = {}
# merge metadata from first response
if sorted_responses:
merged_json.update(
{k: v for k, v in sorted_responses[0].items() if k != "values"}
)
merged_values: List[Any] = []
for resp in sorted_responses:
vals = resp.get("values") or []
merged_values.extend(vals)
# try and deduplicate by date-time
try:
# preserve order and dedupe by date-time string
seen = set()
deduped = []
for v in merged_values:
dt = v.get("date-time")
if dt not in seen:
seen.add(dt)
deduped.append(v)
merged_json["values"] = deduped
except Exception:
# in case the dedup fails, just try and store it all
merged_json["values"] = merged_values
else:
merged_json["values"] = []

final_data = Data(merged_json, selector=selector)

return final_data


def timeseries_df_to_json(
Expand Down Expand Up @@ -280,7 +401,7 @@ def store_ts_ids(
office_id=office_id,
version_date=version_date,
)
store_timeseries(data=data_json)
store_timeseries(data=data_json, multithread=False)
except Exception as e:
print(f"Error processing {ts_id}: {e}")
return None
Expand Down Expand Up @@ -317,7 +438,11 @@ def store_timeseries(
create_as_ltrs: Optional[bool] = False,
store_rule: Optional[str] = None,
override_protection: Optional[bool] = False,
) -> None:
multithread: Optional[bool] = True,
max_threads: int = 30,
max_values_per_chunk: int = 700,
max_retries: int = 3,
) -> Optional[List[Dict[str, Any]]]:
"""Will Create new TimeSeries if not already present. Will store any data provided

Parameters
Expand All @@ -335,23 +460,112 @@ def store_timeseries(
DELETE_INSERT.
override_protection: str, optional, default is False
A flag to ignore the protected data quality when storing data.
multithread: bool, default is true
max_threads: int, maximum numbers of threads
max_values_per_chunk: int, maximum values that will be saved by a thread
max_retries: int, maximum number of store retries that will be attempted

Returns
-------
response
"""

endpoint = "timeseries"
params = {
"create-as-lrts": create_as_ltrs,
"store-rule": store_rule,
"override-protection": override_protection,
}

def _store_single_chunk(chunk_data: JSON, attempt: int = 1) -> Dict[str, Any]:
"""Store a single chunk with retry logic."""
try:
api.post(endpoint, chunk_data, params)
return {"success": True, "attempt": attempt}
except Exception as e:
if attempt >= max_retries:
raise
time.sleep(0.5 * (2 ** (attempt - 1))) # Exponential backoff
return _store_single_chunk(chunk_data, attempt + 1)

def _create_chunks(values: List[Any], metadata: Dict[str, Any]) -> List[JSON]:
"""Split values into chunks and create payloads."""
chunks = []
total_values = len(values)

for i in range(0, total_values, max_values_per_chunk):
chunk_values = values[i : i + max_values_per_chunk]
chunk_payload = dict(metadata)
chunk_payload["values"] = chunk_values
chunks.append(chunk_payload)

return chunks

def _store_chunks_parallel(chunks: List[JSON]) -> List[Dict[str, Any]]:
"""Store chunks using thread pool."""
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
# Submit all chunks
future_to_chunk = {
executor.submit(_store_single_chunk, chunk): idx
for idx, chunk in enumerate(chunks)
}

# Use a dictionary to collect results, then convert to list
chunk_results_dict: Dict[int, Dict[str, Any]] = {}

for future in concurrent.futures.as_completed(future_to_chunk):
chunk_idx = future_to_chunk[future]
try:
chunk_results_dict[chunk_idx] = future.result()
except Exception as e:
chunk = chunks[chunk_idx]
start_date, end_date = _get_date_range(chunk)
print(
f"ERROR: Chunk {chunk_idx} failed ({start_date} to {end_date}): {e}"
)
raise

# Convert to ordered list
return [chunk_results_dict[i] for i in range(len(chunks))]

def _get_date_range(chunk: JSON) -> Tuple[str, str]:
"""Extract date range from chunk for error reporting."""
try:
values = chunk.get("values", [])
if not values:
return ("No values", "No values")
return (values[0][0], values[-1][0])
except Exception:
return ("Error parsing dates", "Error parsing dates")

# Validate input
if not isinstance(data, dict):
raise ValueError("Cannot store a timeseries without a JSON data dictionary")
raise ValueError("Data must be a JSON dictionary")

values = data.get("values", [])
if not values:
raise ValueError("No values to store")

# Post with one thread for small datasets
if not multithread or len(values) <= max_values_per_chunk:
api.post(endpoint, data, params)
return None

# Multi-thread
threads = min(round(len(values) / max_values_per_chunk, 1), max_threads)
print(f"INFO: Storing {len(values)} values with {threads} threads")

# Separate metadata from values
metadata = {k: v for k, v in data.items() if k != "values"}

# Create chunks
chunks = _create_chunks(values, metadata)
print(f"INFO: Created {len(chunks)} chunks, using up to {max_threads} threads")

# Store chunks in parallel
results = _store_chunks_parallel(chunks)

return api.post(endpoint, data, params)
print(f"SUCCESS: All {len(chunks)} chunks stored successfully")
return results


def delete_timeseries(
Expand Down