diff --git a/cwms/catalog/catalog.py b/cwms/catalog/catalog.py index e6912ab3..8e0796d2 100644 --- a/cwms/catalog/catalog.py +++ b/cwms/catalog/catalog.py @@ -1,4 +1,7 @@ -from typing import Optional +from datetime import datetime +from typing import Optional, Tuple + +import pandas as pd import cwms.api as api from cwms.cwms_types import Data @@ -130,3 +133,34 @@ def get_timeseries_catalog( response = api.get(endpoint=endpoint, params=params, api_version=2) return Data(response, selector="entries") + + +def get_ts_extents(ts_id: str, office_id: str) -> Tuple[datetime, datetime, datetime]: + """Retrieves earliest extent, latest extent, and last update via cwms.get_timeseries_catalog + + Parameters + ---------- + ts_id: string + Timseries id to query. + office_id: string + The owning office of the timeseries group. + + Returns + ------- + tuple of datetime objects (earliest_time, latest_time, last_update) + """ + cwms_cat = get_timeseries_catalog( + office_id=office_id, + like=ts_id, + timeseries_group_like=None, + page_size=500, + include_extents=True, + ).df + + times = cwms_cat[cwms_cat.name == ts_id].extents.values[0][0] + + earliest_time = pd.to_datetime(times["earliest-time"]) + latest_time = pd.to_datetime(times["latest-time"]) + last_update = pd.to_datetime(times["last-update"]) + + return earliest_time, latest_time, last_update diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 4d9a6eb0..8e87c496 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -1,11 +1,14 @@ import concurrent.futures -from datetime import datetime -from typing import Any, Dict, Optional +import math +import time +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional, Tuple import pandas as pd from pandas import DataFrame import cwms.api as api +from cwms.catalog.catalog import get_ts_extents from cwms.cwms_types import JSON, Data @@ -71,6 +74,7 @@ def get_ts_ids(ts_id: str) -> Any: begin=begin, end=end, version_date=version_date_dt, + multithread=False, ) result_dict = { "ts_id": ts_id, @@ -123,6 +127,9 @@ def get_timeseries( page_size: Optional[int] = 300000, version_date: Optional[datetime] = None, trim: Optional[bool] = True, + multithread: Optional[bool] = True, + max_threads: int = 30, + max_days_per_chunk: int = 14, ) -> Data: """Retrieves time series values from a specified time series and time window. Value date-times obtained are always in UTC. @@ -161,35 +168,149 @@ def get_timeseries( the timeseries is versioned, the query will return the max aggregate for the time period. trim: boolean, optional, default is True Specifies whether to trim missing values from the beginning and end of the retrieved values. + multithread: boolean, optional, default is True + Specifies whether to trim missing values from the beginning and end of the retrieved values. + max_threads: integer, default is 30 + The maximum number of threads that will be spawned for multithreading + max_days_per_chunk: integer, default is 30 + The maximum number of days that would be included in a thread Returns ------- cwms data type. data.json will return the JSON output and data.df will return a dataframe. dates are all in UTC """ - # creates the dataframe from the timeseries data + def _ensure_utc_datetime(dt: Optional[datetime]) -> Optional[datetime]: + """Convert datetime to UTC, preserving None values.""" + if dt is None: + return None + if not isinstance(dt, datetime): + raise ValueError(f"Expected datetime object, got {type(dt)}") + return ( + dt.replace(tzinfo=timezone.utc) + if dt.tzinfo is None + else dt.astimezone(timezone.utc) + ) + + def _call_api_for_range(begin: datetime, end: datetime) -> Dict[str, Any]: + params = { + "office": office_id, + "name": ts_id, + "unit": unit, + "datum": datum, + "begin": begin.isoformat() if begin else None, + "end": end.isoformat() if end else None, + "page-size": page_size, + "page": None, + "version-date": version_date.isoformat() if version_date else None, + "trim": trim, + } + response = api.get_with_paging( + selector=selector, endpoint=endpoint, params=params + ) + return dict(response) + endpoint = "timeseries" - if begin and not isinstance(begin, datetime): - raise ValueError("begin needs to be in datetime") - if end and not isinstance(end, datetime): - raise ValueError("end needs to be in datetime") - if version_date and not isinstance(version_date, datetime): - raise ValueError("version_date needs to be in datetime") - params = { - "office": office_id, - "name": ts_id, - "unit": unit, - "datum": datum, - "begin": begin.isoformat() if begin else None, - "end": end.isoformat() if end else None, - "page-size": page_size, - "page": None, - "version-date": version_date.isoformat() if version_date else None, - "trim": trim, - } selector = "values" - response = api.get_with_paging(selector=selector, endpoint=endpoint, params=params) - return Data(response, selector=selector) + # validate begin and end times or default to 1 day lookback + end = _ensure_utc_datetime(end) or datetime.now(timezone.utc) + begin = _ensure_utc_datetime(begin) or (end - timedelta(days=1)) + version_date = _ensure_utc_datetime(version_date) + + # Try to get extents for multithreading, fall back to single-threaded if it fails + if multithread: + try: + begin_extent, end_extent, last_update = get_ts_extents( + ts_id=ts_id, office_id=office_id + ) + if begin.replace(tzinfo=timezone.utc) < begin_extent: + begin = begin_extent + except Exception as e: + # If getting extents fails, fall back to single-threaded mode + print( + f"WARNING: Could not retrieve time series extents ({e}). Falling back to single-threaded mode." + ) + response = _call_api_for_range(begin, end) + return Data(response, selector=selector) + + total_days = (end - begin).total_seconds() / (24 * 3600) + + # split into N chunks where each chunk <= max_days_per_chunk, but cap chunks to max_threads + required_chunks = math.ceil(total_days / max_days_per_chunk) + + # Limit the number of threads to max_threads + actual_threads = min(required_chunks, max_threads) + + # if multithread is off or if you can get the data in just one chunk, use single thread + if required_chunks == 1 or not multithread: + response = _call_api_for_range(begin, end) + return Data(response, selector=selector) + + print( + f"INFO: Getting data with {actual_threads} threads. Downloading {required_chunks} required chunks." + ) + + # Create time ranges based on max_days_per_chunk (not number of threads) + ranges: List[Tuple[datetime, datetime]] = [] + current_begin = begin + + for i in range(required_chunks): + current_end = min(current_begin + timedelta(days=max_days_per_chunk), end) + ranges.append((current_begin, current_end)) + current_begin = current_end + + if current_begin >= end: + break + + # perform parallel requests + responses: List[Optional[Dict[str, Any]]] = [None] * len(ranges) + with concurrent.futures.ThreadPoolExecutor(max_workers=actual_threads) as executor: + future_to_idx = { + executor.submit(_call_api_for_range, r[0], r[1]): idx + for idx, r in enumerate(ranges) + } + for fut in concurrent.futures.as_completed(future_to_idx): + idx = future_to_idx[fut] + try: + responses[idx] = fut.result() + except Exception as exc: + # fail fast: re-raise so caller sees error (or change to continue/partial) + raise + + # responses are now in the same order as ranges + sorted_responses = [resp for resp in responses if resp is not None] + + # Merge JSON "values" lists (assumes top-level "values" list present) + merged_json: Dict[str, Any] = {} + # merge metadata from first response + if sorted_responses: + merged_json.update( + {k: v for k, v in sorted_responses[0].items() if k != "values"} + ) + merged_values: List[Any] = [] + for resp in sorted_responses: + vals = resp.get("values") or [] + merged_values.extend(vals) + # try and deduplicate by date-time + try: + # preserve order and dedupe by date-time string + seen = set() + deduped = [] + for v in merged_values: + dt = v.get("date-time") + if dt not in seen: + seen.add(dt) + deduped.append(v) + merged_json["values"] = deduped + except Exception: + # in case the dedup fails, just try and store it all + merged_json["values"] = merged_values + else: + merged_json["values"] = [] + + final_data = Data(merged_json, selector=selector) + + return final_data def timeseries_df_to_json( @@ -280,7 +401,7 @@ def store_ts_ids( office_id=office_id, version_date=version_date, ) - store_timeseries(data=data_json) + store_timeseries(data=data_json, multithread=False) except Exception as e: print(f"Error processing {ts_id}: {e}") return None @@ -317,7 +438,11 @@ def store_timeseries( create_as_ltrs: Optional[bool] = False, store_rule: Optional[str] = None, override_protection: Optional[bool] = False, -) -> None: + multithread: Optional[bool] = True, + max_threads: int = 30, + max_values_per_chunk: int = 700, + max_retries: int = 3, +) -> Optional[List[Dict[str, Any]]]: """Will Create new TimeSeries if not already present. Will store any data provided Parameters @@ -335,12 +460,15 @@ def store_timeseries( DELETE_INSERT. override_protection: str, optional, default is False A flag to ignore the protected data quality when storing data. + multithread: bool, default is true + max_threads: int, maximum numbers of threads + max_values_per_chunk: int, maximum values that will be saved by a thread + max_retries: int, maximum number of store retries that will be attempted Returns ------- response """ - endpoint = "timeseries" params = { "create-as-lrts": create_as_ltrs, @@ -348,10 +476,96 @@ def store_timeseries( "override-protection": override_protection, } + def _store_single_chunk(chunk_data: JSON, attempt: int = 1) -> Dict[str, Any]: + """Store a single chunk with retry logic.""" + try: + api.post(endpoint, chunk_data, params) + return {"success": True, "attempt": attempt} + except Exception as e: + if attempt >= max_retries: + raise + time.sleep(0.5 * (2 ** (attempt - 1))) # Exponential backoff + return _store_single_chunk(chunk_data, attempt + 1) + + def _create_chunks(values: List[Any], metadata: Dict[str, Any]) -> List[JSON]: + """Split values into chunks and create payloads.""" + chunks = [] + total_values = len(values) + + for i in range(0, total_values, max_values_per_chunk): + chunk_values = values[i : i + max_values_per_chunk] + chunk_payload = dict(metadata) + chunk_payload["values"] = chunk_values + chunks.append(chunk_payload) + + return chunks + + def _store_chunks_parallel(chunks: List[JSON]) -> List[Dict[str, Any]]: + """Store chunks using thread pool.""" + with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: + # Submit all chunks + future_to_chunk = { + executor.submit(_store_single_chunk, chunk): idx + for idx, chunk in enumerate(chunks) + } + + # Use a dictionary to collect results, then convert to list + chunk_results_dict: Dict[int, Dict[str, Any]] = {} + + for future in concurrent.futures.as_completed(future_to_chunk): + chunk_idx = future_to_chunk[future] + try: + chunk_results_dict[chunk_idx] = future.result() + except Exception as e: + chunk = chunks[chunk_idx] + start_date, end_date = _get_date_range(chunk) + print( + f"ERROR: Chunk {chunk_idx} failed ({start_date} to {end_date}): {e}" + ) + raise + + # Convert to ordered list + return [chunk_results_dict[i] for i in range(len(chunks))] + + def _get_date_range(chunk: JSON) -> Tuple[str, str]: + """Extract date range from chunk for error reporting.""" + try: + values = chunk.get("values", []) + if not values: + return ("No values", "No values") + return (values[0][0], values[-1][0]) + except Exception: + return ("Error parsing dates", "Error parsing dates") + + # Validate input if not isinstance(data, dict): - raise ValueError("Cannot store a timeseries without a JSON data dictionary") + raise ValueError("Data must be a JSON dictionary") + + values = data.get("values", []) + if not values: + raise ValueError("No values to store") + + # Post with one thread for small datasets + if not multithread or len(values) <= max_values_per_chunk: + api.post(endpoint, data, params) + return None + + # Multi-thread + threads = min(round(len(values) / max_values_per_chunk, 1), max_threads) + print(f"INFO: Storing {len(values)} values with {threads} threads") + + # Separate metadata from values + metadata = {k: v for k, v in data.items() if k != "values"} + + # Create chunks + chunks = _create_chunks(values, metadata) + print(f"INFO: Created {len(chunks)} chunks, using up to {max_threads} threads") + + # Store chunks in parallel + results = _store_chunks_parallel(chunks) - return api.post(endpoint, data, params) + print(f"SUCCESS: All {len(chunks)} chunks stored successfully") + return results def delete_timeseries(