From 09e5cdcaecc4e7f379e821e0a33e899f0ff647f6 Mon Sep 17 00:00:00 2001 From: msweier Date: Tue, 9 Sep 2025 14:23:09 -0500 Subject: [PATCH 01/14] add get_ts_extents --- cwms/catalog/catalog.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/cwms/catalog/catalog.py b/cwms/catalog/catalog.py index e6912ab3..97fac4d1 100644 --- a/cwms/catalog/catalog.py +++ b/cwms/catalog/catalog.py @@ -3,6 +3,8 @@ import cwms.api as api from cwms.cwms_types import Data +from dateutil import parser + def get_locations_catalog( office_id: str, @@ -130,3 +132,36 @@ def get_timeseries_catalog( response = api.get(endpoint=endpoint, params=params, api_version=2) return Data(response, selector="entries") + + +def get_ts_extents(ts_id=str, office_id=str): + """Retrieves earliest extent, latest extent, and last update via cwms.get_timeseries_catalog + + Parameters + ---------- + ts_id: string + Timseries id to query. + office_id: string + The owning office of the timeseries group. + + Returns + ------- + tuple of datetime objects (earliest_time, latest_time, last_update) + """ + cwms_cat = get_timeseries_catalog( + office_id=office_id, + like=ts_id, + timeseries_group_like=None, + page_size=500, + include_extents=True, + ).df + earliest_time = parser.isoparse( + cwms_cat[cwms_cat.name == ts_id].extents.values[0][0]["earliest-time"] + ) + latest_time = parser.isoparse( + cwms_cat[cwms_cat.name == ts_id].extents.values[0][0]["latest-time"] + ) + last_update = parser.isoparse( + cwms_cat[cwms_cat.name == ts_id].extents.values[0][0]["last-update"] + ) + return earliest_time, latest_time, last_update From 423d06352fcd1e044a453cc20846d98910962bf8 Mon Sep 17 00:00:00 2001 From: msweier Date: Tue, 9 Sep 2025 14:23:31 -0500 Subject: [PATCH 02/14] add multithread to get_timeseries --- cwms/timeseries/timeseries.py | 223 +++++++++++++++++++++++++--------- 1 file changed, 165 insertions(+), 58 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 4d9a6eb0..ef54d2f4 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -1,11 +1,13 @@ import concurrent.futures -from datetime import datetime -from typing import Any, Dict, Optional +from datetime import datetime, timedelta, timezone +import math +from typing import Any, Optional import pandas as pd from pandas import DataFrame import cwms.api as api +from cwms.catalog.catalog import get_ts_extents from cwms.cwms_types import JSON, Data @@ -123,73 +125,178 @@ def get_timeseries( page_size: Optional[int] = 300000, version_date: Optional[datetime] = None, trim: Optional[bool] = True, + multithread: Optional[bool] = True, + max_threads: int = 20, + max_days_per_chunk: int = 30, ) -> Data: + """Retrieves time series values from a specified time series and time window. Value date-times - obtained are always in UTC. - - Parameters - ---------- - ts_id: string - Name of the time series whose data is to be included in the response. - office_id: string - The owning office of the time series. - unit: string, optional, default is EN - The unit or unit system of the response. Defaults to EN. Valid values - for the unit field are: - 1. EN. English unit system. - 2. SI. SI unit system. - 3. Other. - datum: string, optional, default is None - The elevation datum of the response. This field affects only elevation location - levels. Valid values for this field are: - 1. NAVD88. - 2. NGVD29. - begin: datetime, optional, default is None - Start of the time window for data to be included in the response. If this field is - not specified, any required time window begins 24 hours prior to the specified - or default end time. Any timezone information should be passed within the datetime - object. If no timezone information is given, default will be UTC. - end: datetime, optional, default is None - End of the time window for data to be included in the response. If this field is - not specified, any required time window ends at the current time. Any timezone - information should be passed within the datetime object. If no timezone information - is given, default will be UTC. - page_size: int, optional, default is 300000: Specifies the number of records to obtain in - a single call. - version_date: datetime, optional, default is None - Version date of time series values being requested. If this field is not specified and - the timeseries is versioned, the query will return the max aggregate for the time period. - trim: boolean, optional, default is True - Specifies whether to trim missing values from the beginning and end of the retrieved values. - Returns - ------- - cwms data type. data.json will return the JSON output and data.df will return a dataframe. dates are all in UTC - """ + obtained are always in UTC. + + Parameters + ---------- + ts_id: string + Name of the time series whose data is to be included in the response. + office_id: string + The owning office of the time series. + unit: string, optional, default is EN + The unit or unit system of the response. Defaults to EN. Valid values + for the unit field are: + 1. EN. English unit system. + 2. SI. SI unit system. + 3. Other. + datum: string, optional, default is None + The elevation datum of the response. This field affects only elevation location + levels. Valid values for this field are: + 1. NAVD88. + 2. NGVD29. + begin: datetime, optional, default is None + Start of the time window for data to be included in the response. If this field is + not specified, any required time window begins 24 hours prior to the specified + or default end time. Any timezone information should be passed within the datetime + object. If no timezone information is given, default will be UTC. + end: datetime, optional, default is None + End of the time window for data to be included in the response. If this field is + not specified, any required time window ends at the current time. Any timezone + information should be passed within the datetime object. If no timezone information + is given, default will be UTC. + page_size: int, optional, default is 300000: Specifies the number of records to obtain in + a single call. + version_date: datetime, optional, default is None + Version date of time series values being requested. If this field is not specified and + the timeseries is versioned, the query will return the max aggregate for the time period. + trim: boolean, optional, default is True + Specifies whether to trim missing values from the beginning and end of the retrieved values. + multithread: boolean, optional, default is False + Specifies whether to trim missing values from the beginning and end of the retrieved values. + max_threads: integer, default is 20 + The maximum number of threads that will be spawned for multithreading + max_days_per_chunk: integer, default is 30 + The maximum number of days that would be included in a thread + Returns + ------- + cwms data type. data.json will return the JSON output and data.df will return a dataframe. dates are all in UTC + """ - # creates the dataframe from the timeseries data endpoint = "timeseries" + selector = "values" + if begin and not isinstance(begin, datetime): raise ValueError("begin needs to be in datetime") if end and not isinstance(end, datetime): raise ValueError("end needs to be in datetime") if version_date and not isinstance(version_date, datetime): raise ValueError("version_date needs to be in datetime") - params = { - "office": office_id, - "name": ts_id, - "unit": unit, - "datum": datum, - "begin": begin.isoformat() if begin else None, - "end": end.isoformat() if end else None, - "page-size": page_size, - "page": None, - "version-date": version_date.isoformat() if version_date else None, - "trim": trim, - } - selector = "values" - response = api.get_with_paging(selector=selector, endpoint=endpoint, params=params) - return Data(response, selector=selector) + # default end to now if not provided + if end is None: + end = datetime.now(timezone.utc) + else: + end = end.replace(tzinfo=timezone.utc) + if begin is None: + # keep original behavior: default window begins 24 hours prior to end + begin = end - timedelta(days=1) + else: + begin = begin.replace(tzinfo=timezone.utc) + + def _call_api_for_range(begin, end): + params = { + "office": office_id, + "name": ts_id, + "unit": unit, + "datum": datum, + "begin": begin.isoformat() if begin else None, + "end": end.isoformat() if end else None, + "page-size": page_size, + "page": None, + "version-date": version_date.isoformat() if version_date else None, + "trim": trim, + } + return api.get_with_paging(selector=selector, endpoint=endpoint, params=params) + + # if multithread disabled or short range, do a single call + if not multithread: + response = _call_api_for_range(begin, end) + return Data(response, selector=selector) + + begin_extent, end_extent, last_update = get_ts_extents( + ts_id=ts_id, office_id=office_id + ) + print("begin", type(begin), begin) + print("begin_extent", type(begin_extent), begin_extent) + print(begin_extent, end_extent, last_update) + if begin.replace(tzinfo=timezone.utc) < begin_extent: + begin = begin_extent + + total_days = (end - begin).total_seconds() / (24 * 3600) + + # split into N chunks where each chunk <= max_days_per_chunk, but cap chunks to max_threads + required_chunks = math.ceil(total_days / max_days_per_chunk) + print("required_chunks=", required_chunks) + chunks = min(required_chunks, max_threads) + + if total_days <= max_days_per_chunk: + response = _call_api_for_range(begin, end) + return Data(response, selector=selector) + + # create roughly equal ranges + chunk_seconds = (end - begin).total_seconds() / chunks + ranges = [] + for i in range(chunks): + b = begin + timedelta(seconds=math.floor(i * chunk_seconds)) + e = begin + timedelta(seconds=math.floor((i + 1) * chunk_seconds)) + if i == chunks - 1: + e = end + ranges.append((b, e)) + + # perform parallel requests + responses = [None] * len(ranges) + with concurrent.futures.ThreadPoolExecutor(max_workers=chunks) as executor: + future_to_idx = { + executor.submit(_call_api_for_range, r[0], r[1]): idx + for idx, r in enumerate(ranges) + } + for fut in concurrent.futures.as_completed(future_to_idx): + idx = future_to_idx[fut] + try: + responses[idx] = fut.result() + except Exception as exc: + # fail fast: re-raise so caller sees error (or change to continue/partial) + raise + + # responses are now in the same order as ranges + sorted_responses = [resp for resp in responses if resp is not None] + + # Merge JSON "values" lists (assumes top-level "values" list present) + merged_json = {} + # merge metadata from first response (you can adjust which metadata to prefer) + if sorted_responses: + merged_json.update( + {k: v for k, v in sorted_responses[0].items() if k != "values"} + ) + merged_values = [] + for resp in sorted_responses: + vals = resp.get("values") or [] + merged_values.extend(vals) + # optionally deduplicate by date-time + try: + # preserve order and dedupe by date-time string + seen = set() + deduped = [] + for v in merged_values: + dt = v.get("date-time") + if dt not in seen: + seen.add(dt) + deduped.append(v) + merged_json["values"] = deduped + except Exception: + merged_json["values"] = merged_values + else: + merged_json["values"] = [] + + final_data = Data(merged_json, selector=selector) + + return final_data def timeseries_df_to_json( From b491403a6ec350cbef031a202196a447025510d0 Mon Sep 17 00:00:00 2001 From: msweier Date: Wed, 10 Sep 2025 12:51:51 -0500 Subject: [PATCH 03/14] fix typing --- cwms/catalog/catalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cwms/catalog/catalog.py b/cwms/catalog/catalog.py index 97fac4d1..a6bdf70e 100644 --- a/cwms/catalog/catalog.py +++ b/cwms/catalog/catalog.py @@ -134,7 +134,7 @@ def get_timeseries_catalog( return Data(response, selector="entries") -def get_ts_extents(ts_id=str, office_id=str): +def get_ts_extents(ts_id: str, office_id: str): """Retrieves earliest extent, latest extent, and last update via cwms.get_timeseries_catalog Parameters From 96b5769e838242933af89d23a974adcce8760c96 Mon Sep 17 00:00:00 2001 From: msweier Date: Wed, 10 Sep 2025 12:52:29 -0500 Subject: [PATCH 04/14] add multi to ts post --- cwms/timeseries/timeseries.py | 77 +++++++++++++++++++++++++++++++++-- 1 file changed, 74 insertions(+), 3 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index ef54d2f4..a2e50f36 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -1,7 +1,8 @@ import concurrent.futures from datetime import datetime, timedelta, timezone import math -from typing import Any, Optional +import time +from typing import Dict, Any, Optional import pandas as pd from pandas import DataFrame @@ -418,13 +419,39 @@ def store_ts_ids( store_ts_ids, ts_data, ts_id, office_id, version_date_dt ) +def _post_chunk_with_retries(endpoint: str, chunk_json: JSON, params: dict, max_retries: int = 3, backoff: float = 0.5): + last_exc = None + for attempt in range(1, max_retries + 1): + try: + resp = api.post(endpoint, chunk_json, params) + # if api.post returns a requests.Response-like object: + status = getattr(resp, "status_code", None) + body = getattr(resp, "json", lambda: None)() + if status is not None: + if 200 <= status < 300: + return {"ok": True, "status": status, "body": body} + # treat 409/4xx specially if you want merges to proceed + return {"ok": False, "status": status, "body": body} + # otherwise assume success if no exception + return {"ok": True, "status": None, "body": resp} + except Exception as e: + last_exc = e + if attempt == max_retries: + raise + time.sleep(backoff * (2 ** (attempt - 1))) + raise last_exc + def store_timeseries( data: JSON, create_as_ltrs: Optional[bool] = False, store_rule: Optional[str] = None, override_protection: Optional[bool] = False, -) -> None: + multithread: bool = True, + max_threads: int = 20, + max_values_per_chunk: int = 700, + max_retries: int = 3, +) -> None: """Will Create new TimeSeries if not already present. Will store any data provided Parameters @@ -458,7 +485,51 @@ def store_timeseries( if not isinstance(data, dict): raise ValueError("Cannot store a timeseries without a JSON data dictionary") - return api.post(endpoint, data, params) + values = data['values'] + total = len(values) + + # small payload: single post (preserve original behavior) + if (not multithread) or total <= max_values_per_chunk: + print('not multi threads***********') + return api.post(endpoint, data, params) + + # determine chunking + required_chunks = math.ceil(total / max_values_per_chunk) + chunks = min(required_chunks, max_threads) + + # preserve metadata keys except "values" + meta = {k: v for k, v in data.items() if k != "values"} + + # build chunk payloads (roughly equal sized) + chunk_payloads = [] + for i in range(chunks): + start = int(math.floor(i * total / chunks)) + end = int(math.floor((i + 1) * total / chunks)) if i < (chunks - 1) else total + chunk_vals = values[start:end] + chunk_json = dict(meta) + chunk_json["values"] = chunk_vals + chunk_payloads.append((i, chunk_json)) + + # post chunks in parallel and collect responses in order + responses = [None] * len(chunk_payloads) + with concurrent.futures.ThreadPoolExecutor(max_workers=chunks) as executor: + future_to_idx = { + executor.submit(_post_chunk_with_retries, endpoint, payload, params, max_retries): idx + for idx, (_, payload) in enumerate(chunk_payloads) + } + for fut in concurrent.futures.as_completed(future_to_idx): + idx = future_to_idx[fut] + responses[idx] = fut.result() # will raise if chunk failed after retries + + # after collecting responses list as above, responses is list of dicts per chunk + errors = [] + for idx, result in enumerate(responses): + if not result or not result.get("ok"): + errors.append((idx, result)) + if errors: + raise RuntimeError(f"Some chunks failed: {errors}") + + return responses def delete_timeseries( From 7b3f56f158461d7917c82b5b60d67c79d870beef Mon Sep 17 00:00:00 2001 From: msweier Date: Wed, 10 Sep 2025 14:16:38 -0500 Subject: [PATCH 05/14] clean up print statements --- cwms/timeseries/timeseries.py | 138 ++++++++++++++++++---------------- 1 file changed, 74 insertions(+), 64 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index a2e50f36..4163258e 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -74,6 +74,7 @@ def get_ts_ids(ts_id: str) -> Any: begin=begin, end=end, version_date=version_date_dt, + multithread=False, ) result_dict = { "ts_id": ts_id, @@ -127,57 +128,56 @@ def get_timeseries( version_date: Optional[datetime] = None, trim: Optional[bool] = True, multithread: Optional[bool] = True, - max_threads: int = 20, - max_days_per_chunk: int = 30, + max_threads: Optional[int] = 20, + max_days_per_chunk: Optional[int] = 14, ) -> Data: - """Retrieves time series values from a specified time series and time window. Value date-times - obtained are always in UTC. - - Parameters - ---------- - ts_id: string - Name of the time series whose data is to be included in the response. - office_id: string - The owning office of the time series. - unit: string, optional, default is EN - The unit or unit system of the response. Defaults to EN. Valid values - for the unit field are: - 1. EN. English unit system. - 2. SI. SI unit system. - 3. Other. - datum: string, optional, default is None - The elevation datum of the response. This field affects only elevation location - levels. Valid values for this field are: - 1. NAVD88. - 2. NGVD29. - begin: datetime, optional, default is None - Start of the time window for data to be included in the response. If this field is - not specified, any required time window begins 24 hours prior to the specified - or default end time. Any timezone information should be passed within the datetime - object. If no timezone information is given, default will be UTC. - end: datetime, optional, default is None - End of the time window for data to be included in the response. If this field is - not specified, any required time window ends at the current time. Any timezone - information should be passed within the datetime object. If no timezone information - is given, default will be UTC. - page_size: int, optional, default is 300000: Specifies the number of records to obtain in - a single call. - version_date: datetime, optional, default is None - Version date of time series values being requested. If this field is not specified and - the timeseries is versioned, the query will return the max aggregate for the time period. - trim: boolean, optional, default is True - Specifies whether to trim missing values from the beginning and end of the retrieved values. - multithread: boolean, optional, default is False - Specifies whether to trim missing values from the beginning and end of the retrieved values. - max_threads: integer, default is 20 - The maximum number of threads that will be spawned for multithreading - max_days_per_chunk: integer, default is 30 - The maximum number of days that would be included in a thread - Returns - ------- - cwms data type. data.json will return the JSON output and data.df will return a dataframe. dates are all in UTC - """ + obtained are always in UTC. + + Parameters + ---------- + ts_id: string + Name of the time series whose data is to be included in the response. + office_id: string + The owning office of the time series. + unit: string, optional, default is EN + The unit or unit system of the response. Defaults to EN. Valid values + for the unit field are: + 1. EN. English unit system. + 2. SI. SI unit system. + 3. Other. + datum: string, optional, default is None + The elevation datum of the response. This field affects only elevation location + levels. Valid values for this field are: + 1. NAVD88. + 2. NGVD29. + begin: datetime, optional, default is None + Start of the time window for data to be included in the response. If this field is + not specified, any required time window begins 24 hours prior to the specified + or default end time. Any timezone information should be passed within the datetime + object. If no timezone information is given, default will be UTC. + end: datetime, optional, default is None + End of the time window for data to be included in the response. If this field is + not specified, any required time window ends at the current time. Any timezone + information should be passed within the datetime object. If no timezone information + is given, default will be UTC. + page_size: int, optional, default is 300000: Specifies the number of records to obtain in + a single call. + version_date: datetime, optional, default is None + Version date of time series values being requested. If this field is not specified and + the timeseries is versioned, the query will return the max aggregate for the time period. + trim: boolean, optional, default is True + Specifies whether to trim missing values from the beginning and end of the retrieved values. + multithread: boolean, optional, default is False + Specifies whether to trim missing values from the beginning and end of the retrieved values. + max_threads: integer, default is 20 + The maximum number of threads that will be spawned for multithreading + max_days_per_chunk: integer, default is 30 + The maximum number of days that would be included in a thread + Returns + ------- + cwms data type. data.json will return the JSON output and data.df will return a dataframe. dates are all in UTC + """ endpoint = "timeseries" selector = "values" @@ -223,9 +223,7 @@ def _call_api_for_range(begin, end): begin_extent, end_extent, last_update = get_ts_extents( ts_id=ts_id, office_id=office_id ) - print("begin", type(begin), begin) - print("begin_extent", type(begin_extent), begin_extent) - print(begin_extent, end_extent, last_update) + if begin.replace(tzinfo=timezone.utc) < begin_extent: begin = begin_extent @@ -233,8 +231,11 @@ def _call_api_for_range(begin, end): # split into N chunks where each chunk <= max_days_per_chunk, but cap chunks to max_threads required_chunks = math.ceil(total_days / max_days_per_chunk) - print("required_chunks=", required_chunks) + chunks = min(required_chunks, max_threads) + print( + f"INFO: Getting data with {max_threads} threads. Downloading {required_chunks} required chunks." + ) if total_days <= max_days_per_chunk: response = _call_api_for_range(begin, end) @@ -388,7 +389,7 @@ def store_ts_ids( office_id=office_id, version_date=version_date, ) - store_timeseries(data=data_json) + store_timeseries(data=data_json, multithread=False) except Exception as e: print(f"Error processing {ts_id}: {e}") return None @@ -419,7 +420,14 @@ def store_ts_ids( store_ts_ids, ts_data, ts_id, office_id, version_date_dt ) -def _post_chunk_with_retries(endpoint: str, chunk_json: JSON, params: dict, max_retries: int = 3, backoff: float = 0.5): + +def _post_chunk_with_retries( + endpoint: str, + chunk_json: JSON, + params: dict, + max_retries: int = 3, + backoff: float = 0.5, +): last_exc = None for attempt in range(1, max_retries + 1): try: @@ -447,11 +455,11 @@ def store_timeseries( create_as_ltrs: Optional[bool] = False, store_rule: Optional[str] = None, override_protection: Optional[bool] = False, - multithread: bool = True, - max_threads: int = 20, - max_values_per_chunk: int = 700, - max_retries: int = 3, -) -> None: + multithread: Optional[bool] = True, + max_threads: Optional[int] = 20, + max_values_per_chunk: Optional[int] = 700, + max_retries: Optional[int] = 3, +) -> None: """Will Create new TimeSeries if not already present. Will store any data provided Parameters @@ -485,17 +493,17 @@ def store_timeseries( if not isinstance(data, dict): raise ValueError("Cannot store a timeseries without a JSON data dictionary") - values = data['values'] + values = data["values"] total = len(values) # small payload: single post (preserve original behavior) if (not multithread) or total <= max_values_per_chunk: - print('not multi threads***********') return api.post(endpoint, data, params) - + # determine chunking required_chunks = math.ceil(total / max_values_per_chunk) chunks = min(required_chunks, max_threads) + print(f"INFO: Storing with {chunks} threads.") # preserve metadata keys except "values" meta = {k: v for k, v in data.items() if k != "values"} @@ -514,7 +522,9 @@ def store_timeseries( responses = [None] * len(chunk_payloads) with concurrent.futures.ThreadPoolExecutor(max_workers=chunks) as executor: future_to_idx = { - executor.submit(_post_chunk_with_retries, endpoint, payload, params, max_retries): idx + executor.submit( + _post_chunk_with_retries, endpoint, payload, params, max_retries + ): idx for idx, (_, payload) in enumerate(chunk_payloads) } for fut in concurrent.futures.as_completed(future_to_idx): From 2f8803fd32cbff18d595b19c416d749bc14b0028 Mon Sep 17 00:00:00 2001 From: msweier Date: Wed, 10 Sep 2025 14:22:01 -0500 Subject: [PATCH 06/14] fix import sort --- cwms/catalog/catalog.py | 4 ++-- cwms/timeseries/timeseries.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cwms/catalog/catalog.py b/cwms/catalog/catalog.py index a6bdf70e..93cfcdbd 100644 --- a/cwms/catalog/catalog.py +++ b/cwms/catalog/catalog.py @@ -1,10 +1,10 @@ from typing import Optional +from dateutil import parser + import cwms.api as api from cwms.cwms_types import Data -from dateutil import parser - def get_locations_catalog( office_id: str, diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 4163258e..d9b05237 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -1,8 +1,8 @@ import concurrent.futures -from datetime import datetime, timedelta, timezone import math import time -from typing import Dict, Any, Optional +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional import pandas as pd from pandas import DataFrame From e986ec261d2e254264a46e8a5afe762490f9d883 Mon Sep 17 00:00:00 2001 From: msweier Date: Wed, 17 Sep 2025 08:35:55 -0500 Subject: [PATCH 07/14] add exception for get extents --- cwms/timeseries/timeseries.py | 149 ++++++++++++++++++++++++++-------- 1 file changed, 115 insertions(+), 34 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index d9b05237..5d4d38c2 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -220,12 +220,18 @@ def _call_api_for_range(begin, end): response = _call_api_for_range(begin, end) return Data(response, selector=selector) - begin_extent, end_extent, last_update = get_ts_extents( - ts_id=ts_id, office_id=office_id - ) - - if begin.replace(tzinfo=timezone.utc) < begin_extent: - begin = begin_extent + # Try to get extents for multithreading, fall back to single-threaded if it fails + try: + begin_extent, end_extent, last_update = get_ts_extents( + ts_id=ts_id, office_id=office_id + ) + if begin.replace(tzinfo=timezone.utc) < begin_extent: + begin = begin_extent + except Exception as e: + # If getting extents fails, fall back to single-threaded mode + print(f"WARNING: Could not retrieve time series extents ({e}). Falling back to single-threaded mode.") + response = _call_api_for_range(begin, end) + return Data(response, selector=selector) total_days = (end - begin).total_seconds() / (24 * 3600) @@ -450,13 +456,29 @@ def _post_chunk_with_retries( raise last_exc +def _get_chunk_date_range(chunk_json: JSON) -> tuple: + """Extract start and end dates from a chunk's values""" + try: + values = chunk_json.get("values", []) + if not values: + return ("No values", "No values") + + # Get first and last date-time + start_date = values[0][0] + end_date = values[-1][0] + + return (start_date, end_date) + except Exception: + return ("Error parsing dates", "Error parsing dates") + + def store_timeseries( data: JSON, create_as_ltrs: Optional[bool] = False, store_rule: Optional[str] = None, override_protection: Optional[bool] = False, multithread: Optional[bool] = True, - max_threads: Optional[int] = 20, + max_threads: Optional[int] = 25, max_values_per_chunk: Optional[int] = 700, max_retries: Optional[int] = 3, ) -> None: @@ -502,49 +524,108 @@ def store_timeseries( # determine chunking required_chunks = math.ceil(total / max_values_per_chunk) - chunks = min(required_chunks, max_threads) - print(f"INFO: Storing with {chunks} threads.") + + # process in batches of max_threads + print(f"INFO: Need {required_chunks} chunks of ~{max_values_per_chunk} values each") + # print(f"INFO: Will process in batches of {max_threads} threads") # preserve metadata keys except "values" meta = {k: v for k, v in data.items() if k != "values"} - # build chunk payloads (roughly equal sized) - chunk_payloads = [] - for i in range(chunks): - start = int(math.floor(i * total / chunks)) - end = int(math.floor((i + 1) * total / chunks)) if i < (chunks - 1) else total + # Build chunk payloads first + all_chunk_payloads = [] + for i in range(required_chunks): + start = i * max_values_per_chunk + end = min(start + max_values_per_chunk, total) chunk_vals = values[start:end] chunk_json = dict(meta) chunk_json["values"] = chunk_vals - chunk_payloads.append((i, chunk_json)) - - # post chunks in parallel and collect responses in order - responses = [None] * len(chunk_payloads) - with concurrent.futures.ThreadPoolExecutor(max_workers=chunks) as executor: - future_to_idx = { - executor.submit( - _post_chunk_with_retries, endpoint, payload, params, max_retries - ): idx - for idx, (_, payload) in enumerate(chunk_payloads) - } - for fut in concurrent.futures.as_completed(future_to_idx): - idx = future_to_idx[fut] - responses[idx] = fut.result() # will raise if chunk failed after retries - - # after collecting responses list as above, responses is list of dicts per chunk + all_chunk_payloads.append((i, chunk_json)) + + # print(f"INFO: Created {len(all_chunk_payloads)} chunks, sizes: {[len(payload[1]['values']) for payload in all_chunk_payloads[:5]]}...") + + # Process chunks in batches of max_threads + all_responses = [None] * len(all_chunk_payloads) + + for batch_start in range(0, len(all_chunk_payloads), max_threads): + batch_end = min(batch_start + max_threads, len(all_chunk_payloads)) + batch_chunks = all_chunk_payloads[batch_start:batch_end] + + # print(f"INFO: Processing batch {batch_start//max_threads + 1}: chunks {batch_start} to {batch_end-1}") + + # Process this batch with ThreadPoolExecutor + with concurrent.futures.ThreadPoolExecutor(max_workers=len(batch_chunks)) as executor: + future_to_idx = { + executor.submit( + _post_chunk_with_retries, endpoint, payload, params, max_retries + ): batch_start + local_idx + for local_idx, (_, payload) in enumerate(batch_chunks) + } + + for fut in concurrent.futures.as_completed(future_to_idx): + global_idx = future_to_idx[fut] + try: + all_responses[global_idx] = fut.result() + except Exception as e: + # Get the chunk that failed and extract date range + chunk_json = all_chunk_payloads[global_idx][1] + start_date, end_date = _get_chunk_date_range(chunk_json) + chunk_size = len(chunk_json["values"]) + print(f"ERROR: Chunk {global_idx} failed (size: {chunk_size}, dates: {start_date} to {end_date}): {e}") + raise + + # Check for errors errors = [] - for idx, result in enumerate(responses): + for idx, result in enumerate(all_responses): if not result or not result.get("ok"): - errors.append((idx, result)) + chunk_json = all_chunk_payloads[idx][1] + start_date, end_date = _get_chunk_date_range(chunk_json) + chunk_size = len(chunk_json["values"]) + status = result.get("status") if result else "No response" + + error_info = { + "chunk_index": idx, + "chunk_size": chunk_size, + "start_date": start_date, + "end_date": end_date, + "status": status, + "result": result + } + errors.append(error_info) + print(f"ERROR: Chunk {idx} failed - Size: {chunk_size}, Dates: {start_date} to {end_date}, Status: {status}") + + if errors: + error_summary = [] + for error in errors: + error_summary.append( + f"Chunk {error['chunk_index']} (size: {error['chunk_size']}, {error['start_date']} to {error['end_date']}): Status {error['status']}" + ) + + error_message = f"Failed chunks:\n" + "\n".join(error_summary) + print(f"SUMMARY: {len(errors)} chunks failed out of {len(all_chunk_payloads)} total") + raise RuntimeError(error_message) + + print(f"INFO: Store success - All {len(all_chunk_payloads)} chunks completed successfully") + return all_responses + if errors: - raise RuntimeError(f"Some chunks failed: {errors}") + # Create a more detailed error message + error_summary = [] + for error in errors: + error_summary.append( + f"Chunk {error['chunk_index']} ({error['start_date']} to {error['end_date']}): Status {error['status']}" + ) + + error_message = f"Failed chunks:\n" + "\n".join(error_summary) + print(f"SUMMARY: {len(errors)} chunks failed out of {len(chunk_payloads)} total") + raise RuntimeError(error_message) return responses def delete_timeseries( ts_id: str, - office_id: str, + office_id: str, begin: datetime, end: datetime, version_date: Optional[datetime] = None, From 7d28009ddfc0134002ce7d5cd94490832f7a8f25 Mon Sep 17 00:00:00 2001 From: msweier Date: Wed, 17 Sep 2025 14:43:51 -0500 Subject: [PATCH 08/14] fix typing --- cwms/catalog/catalog.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cwms/catalog/catalog.py b/cwms/catalog/catalog.py index 93cfcdbd..5fe42c53 100644 --- a/cwms/catalog/catalog.py +++ b/cwms/catalog/catalog.py @@ -1,4 +1,5 @@ -from typing import Optional +from datetime import datetime +from typing import Optional, Tuple from dateutil import parser @@ -134,7 +135,7 @@ def get_timeseries_catalog( return Data(response, selector="entries") -def get_ts_extents(ts_id: str, office_id: str): +def get_ts_extents(ts_id: str, office_id: str) -> Tuple[datetime, datetime, datetime]: """Retrieves earliest extent, latest extent, and last update via cwms.get_timeseries_catalog Parameters From 9aac5ba96a63b4e0451cac0e25dcdb87e8535eb1 Mon Sep 17 00:00:00 2001 From: msweier Date: Wed, 17 Sep 2025 14:44:22 -0500 Subject: [PATCH 09/14] fix typing --- cwms/timeseries/timeseries.py | 142 +++++++++++++++++----------------- 1 file changed, 69 insertions(+), 73 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 5d4d38c2..c05ce40e 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -2,7 +2,7 @@ import math import time from datetime import datetime, timedelta, timezone -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional, Tuple import pandas as pd from pandas import DataFrame @@ -128,8 +128,8 @@ def get_timeseries( version_date: Optional[datetime] = None, trim: Optional[bool] = True, multithread: Optional[bool] = True, - max_threads: Optional[int] = 20, - max_days_per_chunk: Optional[int] = 14, + max_threads: int = 20, + max_days_per_chunk: int = 14, ) -> Data: """Retrieves time series values from a specified time series and time window. Value date-times obtained are always in UTC. @@ -168,7 +168,7 @@ def get_timeseries( the timeseries is versioned, the query will return the max aggregate for the time period. trim: boolean, optional, default is True Specifies whether to trim missing values from the beginning and end of the retrieved values. - multithread: boolean, optional, default is False + multithread: boolean, optional, default is True Specifies whether to trim missing values from the beginning and end of the retrieved values. max_threads: integer, default is 20 The maximum number of threads that will be spawned for multithreading @@ -200,7 +200,7 @@ def get_timeseries( else: begin = begin.replace(tzinfo=timezone.utc) - def _call_api_for_range(begin, end): + def _call_api_for_range(begin: datetime, end: datetime) -> Dict[str, Any]: params = { "office": office_id, "name": ts_id, @@ -213,7 +213,10 @@ def _call_api_for_range(begin, end): "version-date": version_date.isoformat() if version_date else None, "trim": trim, } - return api.get_with_paging(selector=selector, endpoint=endpoint, params=params) + response = api.get_with_paging( + selector=selector, endpoint=endpoint, params=params + ) + return dict(response) # if multithread disabled or short range, do a single call if not multithread: @@ -229,7 +232,9 @@ def _call_api_for_range(begin, end): begin = begin_extent except Exception as e: # If getting extents fails, fall back to single-threaded mode - print(f"WARNING: Could not retrieve time series extents ({e}). Falling back to single-threaded mode.") + print( + f"WARNING: Could not retrieve time series extents ({e}). Falling back to single-threaded mode." + ) response = _call_api_for_range(begin, end) return Data(response, selector=selector) @@ -249,16 +254,16 @@ def _call_api_for_range(begin, end): # create roughly equal ranges chunk_seconds = (end - begin).total_seconds() / chunks - ranges = [] + ranges: List[Tuple[datetime, datetime]] = [] for i in range(chunks): - b = begin + timedelta(seconds=math.floor(i * chunk_seconds)) - e = begin + timedelta(seconds=math.floor((i + 1) * chunk_seconds)) + b_chunk = begin + timedelta(seconds=math.floor(i * chunk_seconds)) + e_chunk = begin + timedelta(seconds=math.floor((i + 1) * chunk_seconds)) if i == chunks - 1: - e = end - ranges.append((b, e)) + e_chunk = end + ranges.append((b_chunk, e_chunk)) # perform parallel requests - responses = [None] * len(ranges) + responses: List[Optional[Dict[str, Any]]] = [None] * len(ranges) with concurrent.futures.ThreadPoolExecutor(max_workers=chunks) as executor: future_to_idx = { executor.submit(_call_api_for_range, r[0], r[1]): idx @@ -276,13 +281,13 @@ def _call_api_for_range(begin, end): sorted_responses = [resp for resp in responses if resp is not None] # Merge JSON "values" lists (assumes top-level "values" list present) - merged_json = {} + merged_json: Dict[str, Any] = {} # merge metadata from first response (you can adjust which metadata to prefer) if sorted_responses: merged_json.update( {k: v for k, v in sorted_responses[0].items() if k != "values"} ) - merged_values = [] + merged_values: List[Any] = [] for resp in sorted_responses: vals = resp.get("values") or [] merged_values.extend(vals) @@ -430,43 +435,31 @@ def store_ts_ids( def _post_chunk_with_retries( endpoint: str, chunk_json: JSON, - params: dict, + params: Dict[str, Any], max_retries: int = 3, backoff: float = 0.5, -): - last_exc = None +) -> None: for attempt in range(1, max_retries + 1): try: - resp = api.post(endpoint, chunk_json, params) - # if api.post returns a requests.Response-like object: - status = getattr(resp, "status_code", None) - body = getattr(resp, "json", lambda: None)() - if status is not None: - if 200 <= status < 300: - return {"ok": True, "status": status, "body": body} - # treat 409/4xx specially if you want merges to proceed - return {"ok": False, "status": status, "body": body} - # otherwise assume success if no exception - return {"ok": True, "status": None, "body": resp} + api.post(endpoint, chunk_json, params) + return except Exception as e: - last_exc = e if attempt == max_retries: raise time.sleep(backoff * (2 ** (attempt - 1))) - raise last_exc -def _get_chunk_date_range(chunk_json: JSON) -> tuple: +def _get_chunk_date_range(chunk_json: JSON) -> Tuple[str, str]: """Extract start and end dates from a chunk's values""" try: values = chunk_json.get("values", []) if not values: return ("No values", "No values") - + # Get first and last date-time start_date = values[0][0] end_date = values[-1][0] - + return (start_date, end_date) except Exception: return ("Error parsing dates", "Error parsing dates") @@ -478,10 +471,10 @@ def store_timeseries( store_rule: Optional[str] = None, override_protection: Optional[bool] = False, multithread: Optional[bool] = True, - max_threads: Optional[int] = 25, - max_values_per_chunk: Optional[int] = 700, - max_retries: Optional[int] = 3, -) -> None: + max_threads: int = 20, + max_values_per_chunk: int = 700, + max_retries: int = 3, +) -> Optional[List[Dict[str, Any] | None]]: """Will Create new TimeSeries if not already present. Will store any data provided Parameters @@ -499,6 +492,10 @@ def store_timeseries( DELETE_INSERT. override_protection: str, optional, default is False A flag to ignore the protected data quality when storing data. + multithread: bool, default is true + max_threads: int, maximum numbers of threads + max_values_per_chunk: int, maximum values that will be saved by a thread + max_retries: int, maximum number of store retries that will be attempted Returns ------- @@ -518,13 +515,14 @@ def store_timeseries( values = data["values"] total = len(values) - # small payload: single post (preserve original behavior) + # small payload: single post if (not multithread) or total <= max_values_per_chunk: - return api.post(endpoint, data, params) + api.post(endpoint, data, params) + return None # determine chunking required_chunks = math.ceil(total / max_values_per_chunk) - + # process in batches of max_threads print(f"INFO: Need {required_chunks} chunks of ~{max_values_per_chunk} values each") # print(f"INFO: Will process in batches of {max_threads} threads") @@ -541,37 +539,43 @@ def store_timeseries( chunk_json = dict(meta) chunk_json["values"] = chunk_vals all_chunk_payloads.append((i, chunk_json)) - + # print(f"INFO: Created {len(all_chunk_payloads)} chunks, sizes: {[len(payload[1]['values']) for payload in all_chunk_payloads[:5]]}...") # Process chunks in batches of max_threads - all_responses = [None] * len(all_chunk_payloads) - + all_responses: List[Optional[Dict[str, Any]]] = [None] * len(all_chunk_payloads) + for batch_start in range(0, len(all_chunk_payloads), max_threads): batch_end = min(batch_start + max_threads, len(all_chunk_payloads)) batch_chunks = all_chunk_payloads[batch_start:batch_end] - + # print(f"INFO: Processing batch {batch_start//max_threads + 1}: chunks {batch_start} to {batch_end-1}") - + # Process this batch with ThreadPoolExecutor - with concurrent.futures.ThreadPoolExecutor(max_workers=len(batch_chunks)) as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=len(batch_chunks) + ) as executor: future_to_idx = { executor.submit( _post_chunk_with_retries, endpoint, payload, params, max_retries - ): batch_start + local_idx + ): batch_start + + local_idx for local_idx, (_, payload) in enumerate(batch_chunks) } - + for fut in concurrent.futures.as_completed(future_to_idx): global_idx = future_to_idx[fut] try: - all_responses[global_idx] = fut.result() + result = fut.result() + all_responses[global_idx] = {"ok": True, "status": 200} except Exception as e: # Get the chunk that failed and extract date range chunk_json = all_chunk_payloads[global_idx][1] start_date, end_date = _get_chunk_date_range(chunk_json) chunk_size = len(chunk_json["values"]) - print(f"ERROR: Chunk {global_idx} failed (size: {chunk_size}, dates: {start_date} to {end_date}): {e}") + print( + f"ERROR: Chunk {global_idx} failed (size: {chunk_size}, dates: {start_date} to {end_date}): {e}" + ) raise # Check for errors @@ -582,50 +586,42 @@ def store_timeseries( start_date, end_date = _get_chunk_date_range(chunk_json) chunk_size = len(chunk_json["values"]) status = result.get("status") if result else "No response" - + error_info = { "chunk_index": idx, "chunk_size": chunk_size, "start_date": start_date, "end_date": end_date, "status": status, - "result": result + "result": result, } errors.append(error_info) - print(f"ERROR: Chunk {idx} failed - Size: {chunk_size}, Dates: {start_date} to {end_date}, Status: {status}") - - if errors: - error_summary = [] - for error in errors: - error_summary.append( - f"Chunk {error['chunk_index']} (size: {error['chunk_size']}, {error['start_date']} to {error['end_date']}): Status {error['status']}" + print( + f"ERROR: Chunk {idx} failed - Size: {chunk_size}, Dates: {start_date} to {end_date}, Status: {status}" ) - - error_message = f"Failed chunks:\n" + "\n".join(error_summary) - print(f"SUMMARY: {len(errors)} chunks failed out of {len(all_chunk_payloads)} total") - raise RuntimeError(error_message) - print(f"INFO: Store success - All {len(all_chunk_payloads)} chunks completed successfully") - return all_responses - if errors: - # Create a more detailed error message error_summary = [] for error in errors: error_summary.append( - f"Chunk {error['chunk_index']} ({error['start_date']} to {error['end_date']}): Status {error['status']}" + f"Chunk {error['chunk_index']} (size: {error['chunk_size']}, {error['start_date']} to {error['end_date']}): Status {error['status']}" ) - + error_message = f"Failed chunks:\n" + "\n".join(error_summary) - print(f"SUMMARY: {len(errors)} chunks failed out of {len(chunk_payloads)} total") + print( + f"SUMMARY: {len(errors)} chunks failed out of {len(all_chunk_payloads)} total" + ) raise RuntimeError(error_message) - return responses + print( + f"INFO: Store success - All {len(all_chunk_payloads)} chunks completed successfully" + ) + return all_responses def delete_timeseries( ts_id: str, - office_id: str, + office_id: str, begin: datetime, end: datetime, version_date: Optional[datetime] = None, From 0d3a5bc3865c3e4f624315536a4e2d5627abf851 Mon Sep 17 00:00:00 2001 From: msweier Date: Wed, 17 Sep 2025 15:04:24 -0500 Subject: [PATCH 10/14] change date parser to pandas --- cwms/catalog/catalog.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/cwms/catalog/catalog.py b/cwms/catalog/catalog.py index 5fe42c53..e5f37afb 100644 --- a/cwms/catalog/catalog.py +++ b/cwms/catalog/catalog.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import Optional, Tuple -from dateutil import parser +import pandas as pd import cwms.api as api from cwms.cwms_types import Data @@ -156,13 +156,11 @@ def get_ts_extents(ts_id: str, office_id: str) -> Tuple[datetime, datetime, date page_size=500, include_extents=True, ).df - earliest_time = parser.isoparse( - cwms_cat[cwms_cat.name == ts_id].extents.values[0][0]["earliest-time"] - ) - latest_time = parser.isoparse( - cwms_cat[cwms_cat.name == ts_id].extents.values[0][0]["latest-time"] - ) - last_update = parser.isoparse( - cwms_cat[cwms_cat.name == ts_id].extents.values[0][0]["last-update"] - ) + + times = cwms_cat[cwms_cat.name == ts_id].extents.values[0][0] + + earliest_time = pd.to_datetime(times['earliest-time']) + latest_time = pd.to_datetime(times['latest-time']) + last_update = pd.to_datetime(times['last-update']) + return earliest_time, latest_time, last_update From 64d3bb103deba4a4f475dce01a17e31bc6e3a3c2 Mon Sep 17 00:00:00 2001 From: msweier Date: Wed, 17 Sep 2025 15:15:03 -0500 Subject: [PATCH 11/14] update format --- cwms/catalog/catalog.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cwms/catalog/catalog.py b/cwms/catalog/catalog.py index e5f37afb..8e0796d2 100644 --- a/cwms/catalog/catalog.py +++ b/cwms/catalog/catalog.py @@ -159,8 +159,8 @@ def get_ts_extents(ts_id: str, office_id: str) -> Tuple[datetime, datetime, date times = cwms_cat[cwms_cat.name == ts_id].extents.values[0][0] - earliest_time = pd.to_datetime(times['earliest-time']) - latest_time = pd.to_datetime(times['latest-time']) - last_update = pd.to_datetime(times['last-update']) + earliest_time = pd.to_datetime(times["earliest-time"]) + latest_time = pd.to_datetime(times["latest-time"]) + last_update = pd.to_datetime(times["last-update"]) return earliest_time, latest_time, last_update From 2fea5d8fcbbc84b556f2ce8efea7e0365ada6203 Mon Sep 17 00:00:00 2001 From: msweier Date: Thu, 18 Sep 2025 11:45:23 -0500 Subject: [PATCH 12/14] clean up store_timeseries --- cwms/timeseries/timeseries.py | 160 ++++++++++++++++------------------ 1 file changed, 76 insertions(+), 84 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index c05ce40e..afbb9607 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -128,7 +128,7 @@ def get_timeseries( version_date: Optional[datetime] = None, trim: Optional[bool] = True, multithread: Optional[bool] = True, - max_threads: int = 20, + max_threads: int = 30, max_days_per_chunk: int = 14, ) -> Data: """Retrieves time series values from a specified time series and time window. Value date-times @@ -170,7 +170,7 @@ def get_timeseries( Specifies whether to trim missing values from the beginning and end of the retrieved values. multithread: boolean, optional, default is True Specifies whether to trim missing values from the beginning and end of the retrieved values. - max_threads: integer, default is 20 + max_threads: integer, default is 30 The maximum number of threads that will be spawned for multithreading max_days_per_chunk: integer, default is 30 The maximum number of days that would be included in a thread @@ -179,26 +179,13 @@ def get_timeseries( cwms data type. data.json will return the JSON output and data.df will return a dataframe. dates are all in UTC """ - endpoint = "timeseries" - selector = "values" - - if begin and not isinstance(begin, datetime): - raise ValueError("begin needs to be in datetime") - if end and not isinstance(end, datetime): - raise ValueError("end needs to be in datetime") - if version_date and not isinstance(version_date, datetime): - raise ValueError("version_date needs to be in datetime") - - # default end to now if not provided - if end is None: - end = datetime.now(timezone.utc) - else: - end = end.replace(tzinfo=timezone.utc) - if begin is None: - # keep original behavior: default window begins 24 hours prior to end - begin = end - timedelta(days=1) - else: - begin = begin.replace(tzinfo=timezone.utc) + def _ensure_utc_datetime(dt: Optional[datetime]) -> Optional[datetime]: + """Convert datetime to UTC, preserving None values.""" + if dt is None: + return None + if not isinstance(dt, datetime): + raise ValueError(f"Expected datetime object, got {type(dt)}") + return dt.replace(tzinfo=timezone.utc) if dt.tzinfo is None else dt.astimezone(timezone.utc) def _call_api_for_range(begin: datetime, end: datetime) -> Dict[str, Any]: params = { @@ -217,26 +204,30 @@ def _call_api_for_range(begin: datetime, end: datetime) -> Dict[str, Any]: selector=selector, endpoint=endpoint, params=params ) return dict(response) + + endpoint = "timeseries" + selector = "values" - # if multithread disabled or short range, do a single call - if not multithread: - response = _call_api_for_range(begin, end) - return Data(response, selector=selector) + # validate begin and end times or default to 1 day lookback + end = _ensure_utc_datetime(end) or datetime.now(timezone.utc) + begin = _ensure_utc_datetime(begin) or (end - timedelta(days=1)) + version_date = _ensure_utc_datetime(version_date) # Try to get extents for multithreading, fall back to single-threaded if it fails - try: - begin_extent, end_extent, last_update = get_ts_extents( - ts_id=ts_id, office_id=office_id - ) - if begin.replace(tzinfo=timezone.utc) < begin_extent: - begin = begin_extent - except Exception as e: - # If getting extents fails, fall back to single-threaded mode - print( - f"WARNING: Could not retrieve time series extents ({e}). Falling back to single-threaded mode." - ) - response = _call_api_for_range(begin, end) - return Data(response, selector=selector) + if multithread: + try: + begin_extent, end_extent, last_update = get_ts_extents( + ts_id=ts_id, office_id=office_id + ) + if begin.replace(tzinfo=timezone.utc) < begin_extent: + begin = begin_extent + except Exception as e: + # If getting extents fails, fall back to single-threaded mode + print( + f"WARNING: Could not retrieve time series extents ({e}). Falling back to single-threaded mode." + ) + response = _call_api_for_range(begin, end) + return Data(response, selector=selector) total_days = (end - begin).total_seconds() / (24 * 3600) @@ -244,14 +235,16 @@ def _call_api_for_range(begin: datetime, end: datetime) -> Dict[str, Any]: required_chunks = math.ceil(total_days / max_days_per_chunk) chunks = min(required_chunks, max_threads) - print( - f"INFO: Getting data with {max_threads} threads. Downloading {required_chunks} required chunks." - ) - if total_days <= max_days_per_chunk: + # if multithread is off or if you can get the data in just one chunk, use single thread + if chunks == 1 or not multithread: response = _call_api_for_range(begin, end) return Data(response, selector=selector) + print( + f"INFO: Getting data with {chunks} threads. Downloading {required_chunks} required chunks." + ) + # create roughly equal ranges chunk_seconds = (end - begin).total_seconds() / chunks ranges: List[Tuple[datetime, datetime]] = [] @@ -282,7 +275,7 @@ def _call_api_for_range(begin: datetime, end: datetime) -> Dict[str, Any]: # Merge JSON "values" lists (assumes top-level "values" list present) merged_json: Dict[str, Any] = {} - # merge metadata from first response (you can adjust which metadata to prefer) + # merge metadata from first response if sorted_responses: merged_json.update( {k: v for k, v in sorted_responses[0].items() if k != "values"} @@ -291,7 +284,7 @@ def _call_api_for_range(begin: datetime, end: datetime) -> Dict[str, Any]: for resp in sorted_responses: vals = resp.get("values") or [] merged_values.extend(vals) - # optionally deduplicate by date-time + # try and deduplicate by date-time try: # preserve order and dedupe by date-time string seen = set() @@ -303,6 +296,7 @@ def _call_api_for_range(begin: datetime, end: datetime) -> Dict[str, Any]: deduped.append(v) merged_json["values"] = deduped except Exception: + # in case the dedup fails, just try and store it all merged_json["values"] = merged_values else: merged_json["values"] = [] @@ -432,46 +426,13 @@ def store_ts_ids( ) -def _post_chunk_with_retries( - endpoint: str, - chunk_json: JSON, - params: Dict[str, Any], - max_retries: int = 3, - backoff: float = 0.5, -) -> None: - for attempt in range(1, max_retries + 1): - try: - api.post(endpoint, chunk_json, params) - return - except Exception as e: - if attempt == max_retries: - raise - time.sleep(backoff * (2 ** (attempt - 1))) - - -def _get_chunk_date_range(chunk_json: JSON) -> Tuple[str, str]: - """Extract start and end dates from a chunk's values""" - try: - values = chunk_json.get("values", []) - if not values: - return ("No values", "No values") - - # Get first and last date-time - start_date = values[0][0] - end_date = values[-1][0] - - return (start_date, end_date) - except Exception: - return ("Error parsing dates", "Error parsing dates") - - def store_timeseries( data: JSON, create_as_ltrs: Optional[bool] = False, store_rule: Optional[str] = None, override_protection: Optional[bool] = False, multithread: Optional[bool] = True, - max_threads: int = 20, + max_threads: int = 30, max_values_per_chunk: int = 700, max_retries: int = 3, ) -> Optional[List[Dict[str, Any] | None]]: @@ -501,7 +462,39 @@ def store_timeseries( ------- response """ + def _post_chunk_with_retries( + endpoint: str, + chunk_json: JSON, + params: Dict[str, Any], + max_retries: int = 3, + backoff: float = 0.5, + ) -> None: + for attempt in range(1, max_retries + 1): + try: + api.post(endpoint, chunk_json, params) + return + except Exception as e: + if attempt == max_retries: + raise + time.sleep(backoff * (2 ** (attempt - 1))) + + def _get_chunk_date_range(chunk_json: JSON) -> Tuple[str, str]: + """Extract start and end dates from a chunk's values to help if something fails""" + try: + values = chunk_json.get("values", []) + if not values: + return ("No values", "No values") + + # Get first and last date-time + start_date = values[0][0] + end_date = values[-1][0] + + return (start_date, end_date) + except Exception: + return ("Error parsing dates", "Error parsing dates") + + endpoint = "timeseries" params = { "create-as-lrts": create_as_ltrs, @@ -515,7 +508,7 @@ def store_timeseries( values = data["values"] total = len(values) - # small payload: single post + # if you can just do a single post if (not multithread) or total <= max_values_per_chunk: api.post(endpoint, data, params) return None @@ -523,9 +516,10 @@ def store_timeseries( # determine chunking required_chunks = math.ceil(total / max_values_per_chunk) + threads = min(required_chunks, max_threads) + # process in batches of max_threads - print(f"INFO: Need {required_chunks} chunks of ~{max_values_per_chunk} values each") - # print(f"INFO: Will process in batches of {max_threads} threads") + print(f"INFO: Downloading {required_chunks} with {threads} threads") # preserve metadata keys except "values" meta = {k: v for k, v in data.items() if k != "values"} @@ -540,8 +534,6 @@ def store_timeseries( chunk_json["values"] = chunk_vals all_chunk_payloads.append((i, chunk_json)) - # print(f"INFO: Created {len(all_chunk_payloads)} chunks, sizes: {[len(payload[1]['values']) for payload in all_chunk_payloads[:5]]}...") - # Process chunks in batches of max_threads all_responses: List[Optional[Dict[str, Any]]] = [None] * len(all_chunk_payloads) From 2125b6fe72e8d774afb0140d14214532c2382e2f Mon Sep 17 00:00:00 2001 From: msweier Date: Thu, 18 Sep 2025 13:46:49 -0500 Subject: [PATCH 13/14] clean up store_timeseries --- cwms/timeseries/timeseries.py | 216 +++++++++++++--------------------- 1 file changed, 85 insertions(+), 131 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index afbb9607..7ed0a23b 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -185,7 +185,11 @@ def _ensure_utc_datetime(dt: Optional[datetime]) -> Optional[datetime]: return None if not isinstance(dt, datetime): raise ValueError(f"Expected datetime object, got {type(dt)}") - return dt.replace(tzinfo=timezone.utc) if dt.tzinfo is None else dt.astimezone(timezone.utc) + return ( + dt.replace(tzinfo=timezone.utc) + if dt.tzinfo is None + else dt.astimezone(timezone.utc) + ) def _call_api_for_range(begin: datetime, end: datetime) -> Dict[str, Any]: params = { @@ -204,7 +208,7 @@ def _call_api_for_range(begin: datetime, end: datetime) -> Dict[str, Any]: selector=selector, endpoint=endpoint, params=params ) return dict(response) - + endpoint = "timeseries" selector = "values" @@ -435,7 +439,7 @@ def store_timeseries( max_threads: int = 30, max_values_per_chunk: int = 700, max_retries: int = 3, -) -> Optional[List[Dict[str, Any] | None]]: +) -> Optional[List[Dict[str, Any]]]: """Will Create new TimeSeries if not already present. Will store any data provided Parameters @@ -462,39 +466,6 @@ def store_timeseries( ------- response """ - def _post_chunk_with_retries( - endpoint: str, - chunk_json: JSON, - params: Dict[str, Any], - max_retries: int = 3, - backoff: float = 0.5, - ) -> None: - for attempt in range(1, max_retries + 1): - try: - api.post(endpoint, chunk_json, params) - return - except Exception as e: - if attempt == max_retries: - raise - time.sleep(backoff * (2 ** (attempt - 1))) - - - def _get_chunk_date_range(chunk_json: JSON) -> Tuple[str, str]: - """Extract start and end dates from a chunk's values to help if something fails""" - try: - values = chunk_json.get("values", []) - if not values: - return ("No values", "No values") - - # Get first and last date-time - start_date = values[0][0] - end_date = values[-1][0] - - return (start_date, end_date) - except Exception: - return ("Error parsing dates", "Error parsing dates") - - endpoint = "timeseries" params = { "create-as-lrts": create_as_ltrs, @@ -502,113 +473,96 @@ def _get_chunk_date_range(chunk_json: JSON) -> Tuple[str, str]: "override-protection": override_protection, } - if not isinstance(data, dict): - raise ValueError("Cannot store a timeseries without a JSON data dictionary") - - values = data["values"] - total = len(values) - - # if you can just do a single post - if (not multithread) or total <= max_values_per_chunk: - api.post(endpoint, data, params) - return None - - # determine chunking - required_chunks = math.ceil(total / max_values_per_chunk) - - threads = min(required_chunks, max_threads) + def _store_single_chunk(chunk_data: JSON, attempt: int = 1) -> Dict[str, Any]: + """Store a single chunk with retry logic.""" + try: + api.post(endpoint, chunk_data, params) + return {"success": True, "attempt": attempt} + except Exception as e: + if attempt >= max_retries: + raise + time.sleep(0.5 * (2 ** (attempt - 1))) # Exponential backoff + return _store_single_chunk(chunk_data, attempt + 1) + + def _create_chunks(values: List[Any], metadata: Dict[str, Any]) -> List[JSON]: + """Split values into chunks and create payloads.""" + chunks = [] + total_values = len(values) + + for i in range(0, total_values, max_values_per_chunk): + chunk_values = values[i : i + max_values_per_chunk] + chunk_payload = dict(metadata) + chunk_payload["values"] = chunk_values + chunks.append(chunk_payload) + + return chunks + + def _store_chunks_parallel(chunks: List[JSON]) -> List[Dict[str, Any]]: + """Store chunks using thread pool.""" + with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: + # Submit all chunks + future_to_chunk = { + executor.submit(_store_single_chunk, chunk): idx + for idx, chunk in enumerate(chunks) + } - # process in batches of max_threads - print(f"INFO: Downloading {required_chunks} with {threads} threads") + # Use a dictionary to collect results, then convert to list + chunk_results_dict: Dict[int, Dict[str, Any]] = {} - # preserve metadata keys except "values" - meta = {k: v for k, v in data.items() if k != "values"} + for future in concurrent.futures.as_completed(future_to_chunk): + chunk_idx = future_to_chunk[future] + try: + chunk_results_dict[chunk_idx] = future.result() + except Exception as e: + chunk = chunks[chunk_idx] + start_date, end_date = _get_date_range(chunk) + print( + f"ERROR: Chunk {chunk_idx} failed ({start_date} to {end_date}): {e}" + ) + raise - # Build chunk payloads first - all_chunk_payloads = [] - for i in range(required_chunks): - start = i * max_values_per_chunk - end = min(start + max_values_per_chunk, total) - chunk_vals = values[start:end] - chunk_json = dict(meta) - chunk_json["values"] = chunk_vals - all_chunk_payloads.append((i, chunk_json)) + # Convert to ordered list + return [chunk_results_dict[i] for i in range(len(chunks))] - # Process chunks in batches of max_threads - all_responses: List[Optional[Dict[str, Any]]] = [None] * len(all_chunk_payloads) + def _get_date_range(chunk: JSON) -> Tuple[str, str]: + """Extract date range from chunk for error reporting.""" + try: + values = chunk.get("values", []) + if not values: + return ("No values", "No values") + return (values[0][0], values[-1][0]) + except Exception: + return ("Error parsing dates", "Error parsing dates") - for batch_start in range(0, len(all_chunk_payloads), max_threads): - batch_end = min(batch_start + max_threads, len(all_chunk_payloads)) - batch_chunks = all_chunk_payloads[batch_start:batch_end] + # Validate input + if not isinstance(data, dict): + raise ValueError("Data must be a JSON dictionary") - # print(f"INFO: Processing batch {batch_start//max_threads + 1}: chunks {batch_start} to {batch_end-1}") + values = data.get("values", []) + if not values: + raise ValueError("No values to store") - # Process this batch with ThreadPoolExecutor - with concurrent.futures.ThreadPoolExecutor( - max_workers=len(batch_chunks) - ) as executor: - future_to_idx = { - executor.submit( - _post_chunk_with_retries, endpoint, payload, params, max_retries - ): batch_start - + local_idx - for local_idx, (_, payload) in enumerate(batch_chunks) - } + # Post with one thread for small datasets + if not multithread or len(values) <= max_values_per_chunk: + api.post(endpoint, data, params) + return None - for fut in concurrent.futures.as_completed(future_to_idx): - global_idx = future_to_idx[fut] - try: - result = fut.result() - all_responses[global_idx] = {"ok": True, "status": 200} - except Exception as e: - # Get the chunk that failed and extract date range - chunk_json = all_chunk_payloads[global_idx][1] - start_date, end_date = _get_chunk_date_range(chunk_json) - chunk_size = len(chunk_json["values"]) - print( - f"ERROR: Chunk {global_idx} failed (size: {chunk_size}, dates: {start_date} to {end_date}): {e}" - ) - raise + # Multi-thread + threads = min(round(len(values) / max_values_per_chunk, 1), max_threads) + print(f"INFO: Storing {len(values)} values with {threads} threads") - # Check for errors - errors = [] - for idx, result in enumerate(all_responses): - if not result or not result.get("ok"): - chunk_json = all_chunk_payloads[idx][1] - start_date, end_date = _get_chunk_date_range(chunk_json) - chunk_size = len(chunk_json["values"]) - status = result.get("status") if result else "No response" - - error_info = { - "chunk_index": idx, - "chunk_size": chunk_size, - "start_date": start_date, - "end_date": end_date, - "status": status, - "result": result, - } - errors.append(error_info) - print( - f"ERROR: Chunk {idx} failed - Size: {chunk_size}, Dates: {start_date} to {end_date}, Status: {status}" - ) + # Separate metadata from values + metadata = {k: v for k, v in data.items() if k != "values"} - if errors: - error_summary = [] - for error in errors: - error_summary.append( - f"Chunk {error['chunk_index']} (size: {error['chunk_size']}, {error['start_date']} to {error['end_date']}): Status {error['status']}" - ) + # Create chunks + chunks = _create_chunks(values, metadata) + print(f"INFO: Created {len(chunks)} chunks, using up to {max_threads} threads") - error_message = f"Failed chunks:\n" + "\n".join(error_summary) - print( - f"SUMMARY: {len(errors)} chunks failed out of {len(all_chunk_payloads)} total" - ) - raise RuntimeError(error_message) + # Store chunks in parallel + results = _store_chunks_parallel(chunks) - print( - f"INFO: Store success - All {len(all_chunk_payloads)} chunks completed successfully" - ) - return all_responses + print(f"SUCCESS: All {len(chunks)} chunks stored successfully") + return results def delete_timeseries( From 8b69ea8caee69de520c5e58b7fd68fb42b2ca6b8 Mon Sep 17 00:00:00 2001 From: msweier Date: Fri, 19 Sep 2025 08:38:24 -0500 Subject: [PATCH 14/14] fix get_timeseries --- cwms/timeseries/timeseries.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 7ed0a23b..8e87c496 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -238,30 +238,33 @@ def _call_api_for_range(begin: datetime, end: datetime) -> Dict[str, Any]: # split into N chunks where each chunk <= max_days_per_chunk, but cap chunks to max_threads required_chunks = math.ceil(total_days / max_days_per_chunk) - chunks = min(required_chunks, max_threads) + # Limit the number of threads to max_threads + actual_threads = min(required_chunks, max_threads) # if multithread is off or if you can get the data in just one chunk, use single thread - if chunks == 1 or not multithread: + if required_chunks == 1 or not multithread: response = _call_api_for_range(begin, end) return Data(response, selector=selector) print( - f"INFO: Getting data with {chunks} threads. Downloading {required_chunks} required chunks." + f"INFO: Getting data with {actual_threads} threads. Downloading {required_chunks} required chunks." ) - # create roughly equal ranges - chunk_seconds = (end - begin).total_seconds() / chunks + # Create time ranges based on max_days_per_chunk (not number of threads) ranges: List[Tuple[datetime, datetime]] = [] - for i in range(chunks): - b_chunk = begin + timedelta(seconds=math.floor(i * chunk_seconds)) - e_chunk = begin + timedelta(seconds=math.floor((i + 1) * chunk_seconds)) - if i == chunks - 1: - e_chunk = end - ranges.append((b_chunk, e_chunk)) + current_begin = begin + + for i in range(required_chunks): + current_end = min(current_begin + timedelta(days=max_days_per_chunk), end) + ranges.append((current_begin, current_end)) + current_begin = current_end + + if current_begin >= end: + break # perform parallel requests responses: List[Optional[Dict[str, Any]]] = [None] * len(ranges) - with concurrent.futures.ThreadPoolExecutor(max_workers=chunks) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=actual_threads) as executor: future_to_idx = { executor.submit(_call_api_for_range, r[0], r[1]): idx for idx, r in enumerate(ranges)