diff --git a/cwms/catalog/catalog.py b/cwms/catalog/catalog.py index e6912ab3..8e0796d2 100644 --- a/cwms/catalog/catalog.py +++ b/cwms/catalog/catalog.py @@ -1,4 +1,7 @@ -from typing import Optional +from datetime import datetime +from typing import Optional, Tuple + +import pandas as pd import cwms.api as api from cwms.cwms_types import Data @@ -130,3 +133,34 @@ def get_timeseries_catalog( response = api.get(endpoint=endpoint, params=params, api_version=2) return Data(response, selector="entries") + + +def get_ts_extents(ts_id: str, office_id: str) -> Tuple[datetime, datetime, datetime]: + """Retrieves earliest extent, latest extent, and last update via cwms.get_timeseries_catalog + + Parameters + ---------- + ts_id: string + Timseries id to query. + office_id: string + The owning office of the timeseries group. + + Returns + ------- + tuple of datetime objects (earliest_time, latest_time, last_update) + """ + cwms_cat = get_timeseries_catalog( + office_id=office_id, + like=ts_id, + timeseries_group_like=None, + page_size=500, + include_extents=True, + ).df + + times = cwms_cat[cwms_cat.name == ts_id].extents.values[0][0] + + earliest_time = pd.to_datetime(times["earliest-time"]) + latest_time = pd.to_datetime(times["latest-time"]) + last_update = pd.to_datetime(times["last-update"]) + + return earliest_time, latest_time, last_update diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 4d9a6eb0..4d69a447 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -1,11 +1,12 @@ import concurrent.futures -from datetime import datetime -from typing import Any, Dict, Optional +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional, Tuple import pandas as pd from pandas import DataFrame import cwms.api as api +from cwms.catalog.catalog import get_ts_extents from cwms.cwms_types import JSON, Data @@ -71,6 +72,7 @@ def get_ts_ids(ts_id: str) -> Any: begin=begin, end=end, version_date=version_date_dt, + multithread=False, ) result_dict = { "ts_id": ts_id, @@ -113,6 +115,176 @@ def get_ts_ids(ts_id: str) -> Any: return data +def chunk_timeseries_time_range( + begin: datetime, end: datetime, chunk_size: timedelta +) -> List[Tuple[datetime, datetime]]: + """ + Splits a time range into smaller chunks. + + Parameters + ---------- + begin : datetime + The start of the time range. + end : datetime + The end of the time range. + chunk_size : timedelta + The size of each chunk. + + Returns + ------- + List[Tuple[datetime, datetime]] + A list of tuples, where each tuple represents the start and end of a chunk. + """ + chunks = [] + current = begin + while current < end: + next_chunk = min(current + chunk_size, end) + chunks.append((current, next_chunk)) + current = next_chunk + return chunks + + +def fetch_timeseries_chunks( + chunks: List[Tuple[datetime, datetime]], + ts_id: str, + office_id: str, + unit: str | None, + datum: Optional[str], + page_size: int | None, + version_date: Optional[datetime], + trim: bool | None, + max_workers: int, +) -> List[Data]: + # Initialize an empty list to store results + results = [] + + # Create a ThreadPoolExecutor to manage multithreading + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit tasks for each chunk to the api + future_to_chunk = { + executor.submit( + get_timeseries_chunk, + ts_id, + office_id, + unit, + datum, + chunk_start, + chunk_end, + page_size, + version_date, + trim, + ): (chunk_start, chunk_end) + for chunk_start, chunk_end in chunks + } + + # Process completed threads as they finish + for future in concurrent.futures.as_completed(future_to_chunk): + try: + # Retrieve the result of the completed future + result = future.result() + results.append(result) + except Exception as e: + # Log or handle any errors that occur during execution + chunk_start, chunk_end = future_to_chunk[future] + print( + f"ERROR: Failed to fetch data from {chunk_start} to {chunk_end}: {e}" + ) + return results + + +def get_timeseries_chunk( + ts_id: str, + office_id: str, + unit: Optional[str] = "EN", + datum: Optional[str] = None, + begin: Optional[datetime] = None, + end: Optional[datetime] = None, + page_size: Optional[int] = 300000, + version_date: Optional[datetime] = None, + trim: Optional[bool] = True, +) -> Data: + + # creates the dataframe from the timeseries data + endpoint = "timeseries" + if begin and not isinstance(begin, datetime): + raise ValueError("begin needs to be in datetime") + if end and not isinstance(end, datetime): + raise ValueError("end needs to be in datetime") + if version_date and not isinstance(version_date, datetime): + raise ValueError("version_date needs to be in datetime") + params = { + "office": office_id, + "name": ts_id, + "unit": unit, + "datum": datum, + "begin": begin.isoformat() if begin else None, + "end": end.isoformat() if end else None, + "page-size": page_size, + "page": None, + "version-date": version_date.isoformat() if version_date else None, + "trim": trim, + } + selector = "values" + + response = api.get_with_paging(selector=selector, endpoint=endpoint, params=params) + return Data(response, selector=selector) + + +def combine_timeseries_results(results: List[Data]) -> Data: + """ + Combines the results from multiple chunks into a single cwms Data object. + + Parameters + ---------- + results : list + List of cwms Data objects returned from the executor. + + Returns + ------- + cwms Data + Combined cwms Data object with merged DataFrame and updated JSON metadata. + """ + # Extract DataFrames from each cwms data object + dataframes = [result.df for result in results] + + # Combine all DataFrames into one + combined_df = pd.concat(dataframes, ignore_index=True) + + # Sort the combined DataFrame by 'date-time' + combined_df.sort_values(by="date-time", inplace=True) + + # Drop duplicate rows based on 'date-time' (if necessary) + combined_df.drop_duplicates(subset="date-time", inplace=True) + + # Extract metadata from the first result (assuming all chunks share the same metadata) + combined_json = results[0].json + + # Update metadata to reflect the combined time range + combined_json["begin"] = combined_df["date-time"].min().isoformat() + combined_json["end"] = combined_df["date-time"].max().isoformat() + combined_json["total"] = len(combined_df) + + # Update the "values" key in the JSON to include the combined data + combined_json["values"] = combined_df.to_dict(orient="records") + + # Return a new cwms Data object with the combined DataFrame and updated metadata + return Data(combined_json, selector="values") + + +def validate_dates( + begin: Optional[datetime] = None, end: Optional[datetime] = None +) -> Tuple[datetime, datetime]: + # Ensure `begin` and `end` are valid datetime objects + begin = begin or datetime.now(tz=timezone.utc) - timedelta( + days=1 + ) # Default to 24 hours ago + end = end or datetime.now(tz=timezone.utc) + # assign UTC tz + begin = begin.replace(tzinfo=timezone.utc) + end = end.replace(tzinfo=timezone.utc) + return begin, end + + def get_timeseries( ts_id: str, office_id: str, @@ -123,6 +295,9 @@ def get_timeseries( page_size: Optional[int] = 300000, version_date: Optional[datetime] = None, trim: Optional[bool] = True, + multithread: Optional[bool] = True, + max_workers: int = 20, + max_days_per_chunk: int = 14, ) -> Data: """Retrieves time series values from a specified time series and time window. Value date-times obtained are always in UTC. @@ -161,19 +336,19 @@ def get_timeseries( the timeseries is versioned, the query will return the max aggregate for the time period. trim: boolean, optional, default is True Specifies whether to trim missing values from the beginning and end of the retrieved values. + multithread: boolean, optional, default is True + Specifies whether to trim missing values from the beginning and end of the retrieved values. + max_workers: integer, default is 20 + The maximum number of worker threads that will be spawned for multithreading, If calling more than 3 years of 15 minute data, consider using 30 max_workers + max_days_per_chunk: integer, default is 14 + The maximum number of days that would be included in a thread. If calling more than 1 year of 15 minute data, consider using 30 days Returns ------- cwms data type. data.json will return the JSON output and data.df will return a dataframe. dates are all in UTC """ - # creates the dataframe from the timeseries data - endpoint = "timeseries" - if begin and not isinstance(begin, datetime): - raise ValueError("begin needs to be in datetime") - if end and not isinstance(end, datetime): - raise ValueError("end needs to be in datetime") - if version_date and not isinstance(version_date, datetime): - raise ValueError("version_date needs to be in datetime") + selector = "values" + params = { "office": office_id, "name": ts_id, @@ -186,10 +361,63 @@ def get_timeseries( "version-date": version_date.isoformat() if version_date else None, "trim": trim, } - selector = "values" - response = api.get_with_paging(selector=selector, endpoint=endpoint, params=params) - return Data(response, selector=selector) + begin, end = validate_dates(begin=begin, end=end) + + # grab extents if begin is before CWMS DB were implemented to prevent empty queries outside of extents + if begin < datetime(2014, 1, 1, tzinfo=timezone.utc) and multithread: + try: + begin_extent, _, _ = get_ts_extents(ts_id=ts_id, office_id=office_id) + # replace begin with begin extent if outside extents + if begin < begin_extent: + begin = begin_extent + print( + f"INFO: Requested begin was before any data in this timeseries. Reseting to {begin}" + ) + except Exception as e: + # If getting extents fails, fall back to single-threaded mode + print( + f"WARNING: Could not retrieve time series extents ({e}). Falling back to single-threaded mode." + ) + + response = api.get_with_paging( + selector=selector, endpoint="timeseries", params=params + ) + return Data(response, selector=selector) + + # divide the time range into chunks + chunks = chunk_timeseries_time_range(begin, end, timedelta(days=max_days_per_chunk)) + + # find max worker thread + max_workers = min(len(chunks), max_workers) + + # if not multithread + if max_workers == 1 or not multithread: + response = api.get_with_paging( + selector=selector, endpoint="timeseries", params=params + ) + return Data(response, selector=selector) + else: + print( + f"INFO: Fetching {len(chunks)} chunks of timeseries data with {max_workers} threads" + ) + # fetch the data + result_list = fetch_timeseries_chunks( + chunks, + ts_id, + office_id, + unit, + datum, + page_size, + version_date, + trim, + max_workers, + ) + + # combine the results + results = combine_timeseries_results(result_list) + + return results def timeseries_df_to_json( @@ -270,6 +498,7 @@ def store_ts_ids( ts_id: str, office_id: str, version_date: Optional[datetime] = None, + multithread: bool = False, ) -> None: try: units = data["units"].iloc[0] @@ -312,11 +541,84 @@ def store_ts_ids( ) +def chunk_timeseries_data( + data: Dict[str, Any], chunk_size: int +) -> List[Dict[str, Any]]: + """ + Splits the time series values into smaller chunks. + + Parameters + ---------- + values : list + List of time series values to be stored. + chunk_size : int + Maximum number of values per chunk. + + Returns + ------- + list + List of chunks, where each chunk is a subset of the values. + """ + values = data["values"] + chunk_list = [] + for i in range(0, len(values), chunk_size): + chunk = values[i : i + chunk_size] + chunked_data = { + "name": data["name"], + "office-id": data["office-id"], + "units": data["units"], + "values": chunk, + "version-date": data.get("version-date"), + } + + chunk_list.append(chunked_data) + return chunk_list + + +def store_timeseries_chunk( + data: JSON, + create_as_ltrs: Optional[bool] = False, + store_rule: Optional[str] = None, + override_protection: Optional[bool] = False, +) -> None: + """ + Stores a single chunk of time series data. + + Parameters + ---------- + chunk : list + A subset of time series values to be stored. + create_as_ltrs : bool + Flag indicating if timeseries should be created as Local Regular Time Series. + store_rule : str + The business rule to use when merging the incoming with existing data. + override_protection : bool + A flag to ignore the protected data quality when storing data. + + Returns + ------- + response + API response for the chunk storage. + """ + endpoint = "timeseries" + params = { + "create-as-lrts": create_as_ltrs, + "store-rule": store_rule, + "override-protection": override_protection, + } + + # Make the API call + return api.post(endpoint, data, params) + + def store_timeseries( data: JSON, create_as_ltrs: Optional[bool] = False, store_rule: Optional[str] = None, override_protection: Optional[bool] = False, + multithread: Optional[bool] = True, + max_workers: int = 20, + chunk_size: int = 2 * 7 * 24 * 4, # two weeks of 15 min data ) -> None: """Will Create new TimeSeries if not already present. Will store any data provided @@ -335,10 +637,16 @@ def store_timeseries( DELETE_INSERT. override_protection: str, optional, default is False A flag to ignore the protected data quality when storing data. + multithread: bool, default is true + max_workers: int, default is 20, maximum numbers of worker threads, + if saving more than 3 years of 15 minute data, consider using 30 + chunk_size: int, default is 2 * 7 * 24 * 4 (two weeks of 15 minute data), + maximum values that will be saved by a thread, if saving more than 3 years of 15 minute data, + consider using 30 days of 15 minute data Returns ------- - response + None """ endpoint = "timeseries" @@ -351,7 +659,44 @@ def store_timeseries( if not isinstance(data, dict): raise ValueError("Cannot store a timeseries without a JSON data dictionary") - return api.post(endpoint, data, params) + # Chunk the data + chunks = chunk_timeseries_data(data, chunk_size) + + # if multi-threaded not needed + if len(chunks) == 1 or not multithread: + return api.post(endpoint, data, params) + + actual_workers = min(max_workers, len(chunks)) + print( + f"INFO: Storing {len(chunks)} chunks of timeseries data with {actual_workers} threads" + ) + + # Store chunks concurrently + responses: List[Dict[str, Any]] = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + # Initialize an empty list to store futures + futures = [] + # Submit each chunk as a separate task to the executor + for chunk in chunks: + future = executor.submit( + store_timeseries_chunk, # The function to execute + chunk, # The chunk of data to store + create_as_ltrs, # Whether to create as LRTS + store_rule, # The store rule to use + override_protection, # Whether to override protection + ) + futures.append(future) # Add the future to the list + + for future in concurrent.futures.as_completed(futures): + try: + responses.append({"success:": future.result()}) + except Exception as e: + start_time = chunk["values"][0][0] + end_time = chunk["values"][-1][0] + print(f"Error storing chunk from {start_time} to {end_time}: {e}") + responses.append({"error": str(e)}) + + return def delete_timeseries( diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index 97c3882e..0884c7aa 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -1,6 +1,8 @@ from datetime import datetime, timedelta, timezone +from unittest.mock import patch import pandas as pd +import pandas.testing as pdt import pytest import cwms @@ -11,7 +13,25 @@ TEST_TSID = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Test" TEST_TSID_MULTI = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi" TEST_TSID_STORE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Store" +TEST_TSID_CHUNK_MULTI = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Chunk" TEST_TSID_DELETE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Delete" +# Generate 15-minute interval timestamps +START_DATE_CHUNK_MULTI = datetime(2025, 7, 31, 0, 0, tzinfo=timezone.utc) +END_DATE_CHUNK_MULTI = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc) +DT_CHUNK_MULTI = pd.date_range( + start=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + freq="15min", + tz="UTC", +) +# Create DataFrame +DF_CHUNK_MULTI = pd.DataFrame( + { + "date-time": DT_CHUNK_MULTI, + "value": [86.57 + (i % 10) * 0.01 for i in range(len(DT_CHUNK_MULTI))], + "quality-code": [0] * len(DT_CHUNK_MULTI), + } +) @pytest.fixture(scope="module", autouse=True) @@ -131,6 +151,80 @@ def test_get_multi_timeseries_df(): ), f"{ts_id_rev_test} not found in DataFrame columns" +def test_store_timeseries_multi_chunk_ts(): + # Define parameters + ts_id = TEST_TSID_CHUNK_MULTI + office = TEST_OFFICE + units = "m" + + # Convert DataFrame to JSON format + ts_json = ts.timeseries_df_to_json(DF_CHUNK_MULTI, ts_id, units, office) + + # Capture the log output + with patch("builtins.print") as mock_print: + ts.store_timeseries(ts_json, multithread=True, chunk_size=2 * 7 * 24 * 4) + + # Extract the log messages + log_messages = [call.args[0] for call in mock_print.call_args_list] + + # Find the relevant log message + store_log = next((msg for msg in log_messages if "INFO: Storing" in msg), None) + assert store_log is not None, "Expected log message not found" + + # Parse the number of chunks and threads + chunks = int(store_log.split("chunks")[0].split()[-1]) + threads = int(store_log.split("with")[1].split()[0]) + + # Assert the expected values + assert chunks == 5, f"Expected 5 chunks, but got {chunks}" + assert threads == 5, f"Expected 5 threads, but got {threads}" + + +def test_read_timeseries_multi_chunk_ts(): + + # Capture the log output + with patch("builtins.print") as mock_print: + data_multithread = ts.get_timeseries( + ts_id=TEST_TSID_CHUNK_MULTI, + office_id=TEST_OFFICE, + begin=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + max_days_per_chunk=14, + unit="SI", + ) + + # Extract the log messages + log_messages = [call.args[0] for call in mock_print.call_args_list] + + # Find the relevant log message + read_log = next((msg for msg in log_messages if "INFO: Fetching" in msg), None) + assert read_log is not None, "Expected log message not found" + + # Parse the number of chunks and threads + chunks = int(read_log.split("chunks")[0].split()[-1]) + threads = int(read_log.split("with")[1].split()[0]) + + # Assert the expected values + assert chunks == 5, f"Expected 5 chunks, but got {chunks}" + assert threads == 5, f"Expected 5 threads, but got {threads}" + + # Check metadata for multithreaded read + data_json = data_multithread.json + + # check df values + df = data_multithread.df.copy() + + # make sure the dataframe matches stored dataframe + pdt.assert_frame_equal( + df, DF_CHUNK_MULTI + ), f"Data frames do not match: original = {DF_CHUNK_MULTI.describe()}, stored = {df.describe()}" + + # Check metadata + assert data_json["name"] == TEST_TSID_CHUNK_MULTI + assert data_json["office-id"] == TEST_OFFICE + assert data_json["units"] == "m" + + def test_delete_timeseries(): ts_id_rev_test = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") now = datetime.now(timezone.utc).replace(microsecond=0)