From 35cf1494a7f866ebe8d09622ff327d3c4dfc1673 Mon Sep 17 00:00:00 2001 From: msweier Date: Fri, 26 Sep 2025 14:09:24 -0500 Subject: [PATCH 01/15] add get_ts_extents --- cwms/catalog/catalog.py | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/cwms/catalog/catalog.py b/cwms/catalog/catalog.py index e6912ab3..8e0796d2 100644 --- a/cwms/catalog/catalog.py +++ b/cwms/catalog/catalog.py @@ -1,4 +1,7 @@ -from typing import Optional +from datetime import datetime +from typing import Optional, Tuple + +import pandas as pd import cwms.api as api from cwms.cwms_types import Data @@ -130,3 +133,34 @@ def get_timeseries_catalog( response = api.get(endpoint=endpoint, params=params, api_version=2) return Data(response, selector="entries") + + +def get_ts_extents(ts_id: str, office_id: str) -> Tuple[datetime, datetime, datetime]: + """Retrieves earliest extent, latest extent, and last update via cwms.get_timeseries_catalog + + Parameters + ---------- + ts_id: string + Timseries id to query. + office_id: string + The owning office of the timeseries group. + + Returns + ------- + tuple of datetime objects (earliest_time, latest_time, last_update) + """ + cwms_cat = get_timeseries_catalog( + office_id=office_id, + like=ts_id, + timeseries_group_like=None, + page_size=500, + include_extents=True, + ).df + + times = cwms_cat[cwms_cat.name == ts_id].extents.values[0][0] + + earliest_time = pd.to_datetime(times["earliest-time"]) + latest_time = pd.to_datetime(times["latest-time"]) + last_update = pd.to_datetime(times["last-update"]) + + return earliest_time, latest_time, last_update From 287bd4682e500ec1f01463e2a5ed7686182c9d40 Mon Sep 17 00:00:00 2001 From: msweier Date: Mon, 29 Sep 2025 10:42:18 -0500 Subject: [PATCH 02/15] add multithread to get_timeseries --- cwms/timeseries/timeseries.py | 177 +++++++++++++++++++++++++++++++++- 1 file changed, 172 insertions(+), 5 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 4d9a6eb0..4be7d037 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -1,6 +1,6 @@ import concurrent.futures -from datetime import datetime -from typing import Any, Dict, Optional +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional import pandas as pd from pandas import DataFrame @@ -71,6 +71,7 @@ def get_ts_ids(ts_id: str) -> Any: begin=begin, end=end, version_date=version_date_dt, + multithread=False, ) result_dict = { "ts_id": ts_id, @@ -113,6 +114,132 @@ def get_ts_ids(ts_id: str) -> Any: return data +def chunk_ts_time_range(begin, end, chunk_size) -> List: + chunks = [] + current = begin + while current < end: + next_chunk = min(current + chunk_size, end) + chunks.append((current, next_chunk)) + current = next_chunk + return chunks + + +def fetch_ts_chunks( + chunks, ts_id, office_id, unit, datum, page_size, version_date, trim, max_workers +): + # Initialize an empty list to store results + results = [] + + # Create a ThreadPoolExecutor to manage multithreading + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit tasks for each chunk to the api + future_to_chunk = { + executor.submit( + get_timeseries_chunk, + ts_id, + office_id, + unit, + datum, + chunk_start, + chunk_end, + page_size, + version_date, + trim, + ): (chunk_start, chunk_end) + for chunk_start, chunk_end in chunks + } + + # Process completed threads as they finish + for future in concurrent.futures.as_completed(future_to_chunk): + try: + # Retrieve the result of the completed future + result = future.result() + results.append(result) + except Exception as e: + # Log or handle any errors that occur during execution + chunk_start, chunk_end = future_to_chunk[future] + return results + + +def get_timeseries_chunk( + ts_id: str, + office_id: str, + unit: Optional[str] = "EN", + datum: Optional[str] = None, + begin: Optional[datetime] = None, + end: Optional[datetime] = None, + page_size: Optional[int] = 300000, + version_date: Optional[datetime] = None, + trim: Optional[bool] = True, +) -> Data: + + # creates the dataframe from the timeseries data + endpoint = "timeseries" + if begin and not isinstance(begin, datetime): + raise ValueError("begin needs to be in datetime") + if end and not isinstance(end, datetime): + raise ValueError("end needs to be in datetime") + if version_date and not isinstance(version_date, datetime): + raise ValueError("version_date needs to be in datetime") + params = { + "office": office_id, + "name": ts_id, + "unit": unit, + "datum": datum, + "begin": begin.isoformat() if begin else None, + "end": end.isoformat() if end else None, + "page-size": page_size, + "page": None, + "version-date": version_date.isoformat() if version_date else None, + "trim": trim, + } + selector = "values" + + response = api.get_with_paging(selector=selector, endpoint=endpoint, params=params) + return Data(response, selector=selector) + + +def combine_ts_results(results): + """ + Combines the results from multiple chunks into a single cwms Data object. + + Parameters + ---------- + results : list + List of cwms Data objects returned from the executor. + + Returns + ------- + cwms Data + Combined cwms Data object with merged DataFrame and updated JSON metadata. + """ + # Extract DataFrames from each cwms Data object + dataframes = [result.df for result in results] + + # Combine all DataFrames into one + combined_df = pd.concat(dataframes, ignore_index=True) + + # Sort the combined DataFrame by 'date-time' + combined_df.sort_values(by="date-time", inplace=True) + + # Drop duplicate rows based on 'date-time' (if necessary) + combined_df.drop_duplicates(subset="date-time", inplace=True) + + # Extract metadata from the first result (assuming all chunks share the same metadata) + combined_json = results[0].json + + # Update metadata to reflect the combined time range + combined_json["begin"] = combined_df["date-time"].min().isoformat() + combined_json["end"] = combined_df["date-time"].max().isoformat() + combined_json["total"] = len(combined_df) + + # Update the "values" key in the JSON to include the combined data + combined_json["values"] = combined_df.to_dict(orient="records") + + # Return a new cwms Data object with the combined DataFrame and updated metadata + return Data(combined_json, selector="values") + + def get_timeseries( ts_id: str, office_id: str, @@ -123,6 +250,9 @@ def get_timeseries( page_size: Optional[int] = 300000, version_date: Optional[datetime] = None, trim: Optional[bool] = True, + multithread: Optional[bool] = True, + max_workers: int = 15, + max_days_per_chunk: int = 14, ) -> Data: """Retrieves time series values from a specified time series and time window. Value date-times obtained are always in UTC. @@ -161,6 +291,12 @@ def get_timeseries( the timeseries is versioned, the query will return the max aggregate for the time period. trim: boolean, optional, default is True Specifies whether to trim missing values from the beginning and end of the retrieved values. + multithread: boolean, optional, default is True + Specifies whether to trim missing values from the beginning and end of the retrieved values. + max_workers: integer, default is 15 + The maximum number of worker threads that will be spawned for multithreading + max_days_per_chunk: integer, default is 14 + The maximum number of days that would be included in a thread Returns ------- cwms data type. data.json will return the JSON output and data.df will return a dataframe. dates are all in UTC @@ -186,10 +322,41 @@ def get_timeseries( "version-date": version_date.isoformat() if version_date else None, "trim": trim, } - selector = "values" - response = api.get_with_paging(selector=selector, endpoint=endpoint, params=params) - return Data(response, selector=selector) + # divide the time range into chunks + chunks = chunk_ts_time_range(begin, end, timedelta(days=max_days_per_chunk)) + + # find max worker thread + max_workers = min(len(chunks), max_workers) + + # if not multithread + if max_workers == 1 or not multithread: + selector = "values" + response = api.get_with_paging( + selector=selector, endpoint="timeseries", params=params + ) + return Data(response, selector=selector) + else: + print( + f"INFO: Fetching {len(chunks)} chunks of timeseries data with {max_workers} threads" + ) + # fetch the data + result_list = fetch_ts_chunks( + chunks, + ts_id, + office_id, + unit, + datum, + page_size, + version_date, + trim, + max_workers, + ) + + # combine the results + results = combine_ts_results(result_list) + + return results def timeseries_df_to_json( From e5061b83d35ab7cdd5219fc236ad9aad75a08e42 Mon Sep 17 00:00:00 2001 From: msweier Date: Mon, 29 Sep 2025 13:41:23 -0500 Subject: [PATCH 03/15] add multi to store_timeseries --- cwms/timeseries/timeseries.py | 126 +++++++++++++++++++++++++++++++--- 1 file changed, 117 insertions(+), 9 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 4be7d037..a65f99e2 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -114,7 +114,7 @@ def get_ts_ids(ts_id: str) -> Any: return data -def chunk_ts_time_range(begin, end, chunk_size) -> List: +def chunk_timeseries_time_range(begin, end, chunk_size) -> List: chunks = [] current = begin while current < end: @@ -124,7 +124,7 @@ def chunk_ts_time_range(begin, end, chunk_size) -> List: return chunks -def fetch_ts_chunks( +def fetch_timeseries_chunks( chunks, ts_id, office_id, unit, datum, page_size, version_date, trim, max_workers ): # Initialize an empty list to store results @@ -199,7 +199,7 @@ def get_timeseries_chunk( return Data(response, selector=selector) -def combine_ts_results(results): +def combine_timeseries_results(results): """ Combines the results from multiple chunks into a single cwms Data object. @@ -213,9 +213,9 @@ def combine_ts_results(results): cwms Data Combined cwms Data object with merged DataFrame and updated JSON metadata. """ - # Extract DataFrames from each cwms Data object + # Extract DataFrames from each cwms data object dataframes = [result.df for result in results] - + # Combine all DataFrames into one combined_df = pd.concat(dataframes, ignore_index=True) @@ -324,7 +324,7 @@ def get_timeseries( } # divide the time range into chunks - chunks = chunk_ts_time_range(begin, end, timedelta(days=max_days_per_chunk)) + chunks = chunk_timeseries_time_range(begin, end, timedelta(days=max_days_per_chunk)) # find max worker thread max_workers = min(len(chunks), max_workers) @@ -341,7 +341,7 @@ def get_timeseries( f"INFO: Fetching {len(chunks)} chunks of timeseries data with {max_workers} threads" ) # fetch the data - result_list = fetch_ts_chunks( + result_list = fetch_timeseries_chunks( chunks, ts_id, office_id, @@ -354,7 +354,7 @@ def get_timeseries( ) # combine the results - results = combine_ts_results(result_list) + results = combine_timeseries_results(result_list) return results @@ -437,6 +437,7 @@ def store_ts_ids( ts_id: str, office_id: str, version_date: Optional[datetime] = None, + multithread=False, ) -> None: try: units = data["units"].iloc[0] @@ -479,11 +480,79 @@ def store_ts_ids( ) +def chunk_timeseries_data(data, chunk_size): + import json + + """ + Splits the time series values into smaller chunks. + + Parameters + ---------- + values : list + List of time series values to be stored. + chunk_size : int + Maximum number of values per chunk. + + Returns + ------- + list + List of chunks, where each chunk is a subset of the values. + """ + values = data["values"] + chunk_list = [] + for i in range(0, len(values), chunk_size): + chunk = values[i : i + chunk_size] + chunked_data = { + "name": data["name"], + "office-id": data["office-id"], + "units": data["units"], + "values": chunk, + "version-date": data.get("version-date"), + } + + chunk_list.append(chunked_data) + return chunk_list + + +def store_timeseries_chunk(data, create_as_ltrs, store_rule, override_protection): + """ + Stores a single chunk of time series data. + + Parameters + ---------- + chunk : list + A subset of time series values to be stored. + create_as_ltrs : bool + Flag indicating if timeseries should be created as Local Regular Time Series. + store_rule : str + The business rule to use when merging the incoming with existing data. + override_protection : bool + A flag to ignore the protected data quality when storing data. + + Returns + ------- + response + API response for the chunk storage. + """ + endpoint = "timeseries" + params = { + "create-as-lrts": create_as_ltrs, + "store-rule": store_rule, + "override-protection": override_protection, + } + + # Make the API call + return api.post(endpoint, data, params) + + def store_timeseries( data: JSON, create_as_ltrs: Optional[bool] = False, store_rule: Optional[str] = None, override_protection: Optional[bool] = False, + multithread: Optional[bool] = True, + max_workers: int = 30, + chunk_size: int = 2 * 7 * 24 * 4, # two weeks of 15 min data ) -> None: """Will Create new TimeSeries if not already present. Will store any data provided @@ -502,6 +571,9 @@ def store_timeseries( DELETE_INSERT. override_protection: str, optional, default is False A flag to ignore the protected data quality when storing data. + multithread: bool, default is true + max_workers: int, maximum numbers of worker threads + chunk_size: int, maximum values that will be saved by a thread Returns ------- @@ -518,7 +590,43 @@ def store_timeseries( if not isinstance(data, dict): raise ValueError("Cannot store a timeseries without a JSON data dictionary") - return api.post(endpoint, data, params) + # Chunk the data + chunks = chunk_timeseries_data(data, chunk_size) + + # if multi-threaded not needed + if len(chunks) == 1 or not multithread: + return api.post(endpoint, data, params) + + print( + f"INFO: Storing {len(chunks)} chunks of timeseries data with {max_workers} threads" + ) + + # Store chunks concurrently + responses = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + # Initialize an empty list to store futures + futures = [] + # Submit each chunk as a separate task to the executor + for chunk in chunks: + future = executor.submit( + store_timeseries_chunk, # The function to execute + chunk, # The chunk of data to store + create_as_ltrs, # Whether to create as LRTS + store_rule, # The store rule to use + override_protection, # Whether to override protection + ) + futures.append(future) # Add the future to the list + + for future in concurrent.futures.as_completed(futures): + try: + responses.append(future.result()) + except Exception as e: + start_time = chunk["values"][0][0] + end_time = chunk["values"][-1][0] + print(f"Error storing chunk from {start_time} to {end_time}: {e}") + responses.append({"error": str(e)}) + + return responses def delete_timeseries( From 52d17b7b06002dcc746eb73b4bb41a5153bba8ab Mon Sep 17 00:00:00 2001 From: msweier Date: Mon, 29 Sep 2025 15:54:04 -0500 Subject: [PATCH 04/15] fix typing --- cwms/timeseries/timeseries.py | 69 +++++++++++++++++++++++++---------- 1 file changed, 49 insertions(+), 20 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index a65f99e2..ff9a4efe 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -1,6 +1,6 @@ import concurrent.futures from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import pandas as pd from pandas import DataFrame @@ -114,7 +114,26 @@ def get_ts_ids(ts_id: str) -> Any: return data -def chunk_timeseries_time_range(begin, end, chunk_size) -> List: +def chunk_timeseries_time_range( + begin: datetime, end: datetime, chunk_size: timedelta +) -> List[Tuple[datetime, datetime]]: + """ + Splits a time range into smaller chunks. + + Parameters + ---------- + begin : datetime + The start of the time range. + end : datetime + The end of the time range. + chunk_size : timedelta + The size of each chunk. + + Returns + ------- + List[Tuple[datetime, datetime]] + A list of tuples, where each tuple represents the start and end of a chunk. + """ chunks = [] current = begin while current < end: @@ -125,8 +144,16 @@ def chunk_timeseries_time_range(begin, end, chunk_size) -> List: def fetch_timeseries_chunks( - chunks, ts_id, office_id, unit, datum, page_size, version_date, trim, max_workers -): + chunks: List[Tuple[datetime, datetime]], + ts_id: str, + office_id: str, + unit: str | None, + datum: Optional[str], + page_size: int | None, + version_date: Optional[datetime], + trim: bool | None, + max_workers: int, +) -> List[Data]: # Initialize an empty list to store results results = [] @@ -199,7 +226,7 @@ def get_timeseries_chunk( return Data(response, selector=selector) -def combine_timeseries_results(results): +def combine_timeseries_results(results: List[Data]) -> Data: """ Combines the results from multiple chunks into a single cwms Data object. @@ -304,12 +331,9 @@ def get_timeseries( # creates the dataframe from the timeseries data endpoint = "timeseries" - if begin and not isinstance(begin, datetime): - raise ValueError("begin needs to be in datetime") - if end and not isinstance(end, datetime): - raise ValueError("end needs to be in datetime") - if version_date and not isinstance(version_date, datetime): - raise ValueError("version_date needs to be in datetime") + # Ensure `begin` and `end` are valid datetime objects + begin = begin or datetime.now() - timedelta(days=1) # Default to 24 hours ago + end = end or datetime.now() params = { "office": office_id, "name": ts_id, @@ -437,7 +461,7 @@ def store_ts_ids( ts_id: str, office_id: str, version_date: Optional[datetime] = None, - multithread=False, + multithread: bool = False, ) -> None: try: units = data["units"].iloc[0] @@ -480,9 +504,9 @@ def store_ts_ids( ) -def chunk_timeseries_data(data, chunk_size): - import json - +def chunk_timeseries_data( + data: Dict[str, Any], chunk_size: int +) -> List[Dict[str, Any]]: """ Splits the time series values into smaller chunks. @@ -514,7 +538,12 @@ def chunk_timeseries_data(data, chunk_size): return chunk_list -def store_timeseries_chunk(data, create_as_ltrs, store_rule, override_protection): +def store_timeseries_chunk( + data: JSON, + create_as_ltrs: Optional[bool] = False, + store_rule: Optional[str] = None, + override_protection: Optional[bool] = False, +) -> None: """ Stores a single chunk of time series data. @@ -577,7 +606,7 @@ def store_timeseries( Returns ------- - response + None """ endpoint = "timeseries" @@ -602,7 +631,7 @@ def store_timeseries( ) # Store chunks concurrently - responses = [] + responses: List[Dict[str, Any]] = [] with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # Initialize an empty list to store futures futures = [] @@ -619,14 +648,14 @@ def store_timeseries( for future in concurrent.futures.as_completed(futures): try: - responses.append(future.result()) + responses.append({"success:": future.result()}) except Exception as e: start_time = chunk["values"][0][0] end_time = chunk["values"][-1][0] print(f"Error storing chunk from {start_time} to {end_time}: {e}") responses.append({"error": str(e)}) - return responses + return def delete_timeseries( From f2b80a2cd10674d05e7f9af7a91dc8a6b10f3447 Mon Sep 17 00:00:00 2001 From: msweier Date: Tue, 30 Sep 2025 08:43:20 -0500 Subject: [PATCH 05/15] add extent check --- cwms/timeseries/timeseries.py | 53 +++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 9 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index ff9a4efe..9bbf41bb 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -1,11 +1,12 @@ import concurrent.futures -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional, Tuple import pandas as pd from pandas import DataFrame import cwms.api as api +from cwms.catalog.catalog import get_ts_extents from cwms.cwms_types import JSON, Data @@ -267,6 +268,20 @@ def combine_timeseries_results(results: List[Data]) -> Data: return Data(combined_json, selector="values") +def validate_dates( + begin: Optional[datetime] = None, end: Optional[datetime] = None +) -> Tuple[datetime, datetime]: + # Ensure `begin` and `end` are valid datetime objects + begin = begin or datetime.now(tz=timezone.utc) - timedelta( + days=1 + ) # Default to 24 hours ago + end = end or datetime.now(tz=timezone.utc) + # assign UTC tz + begin = begin.replace(tzinfo=timezone.utc) + end = end.replace(tzinfo=timezone.utc) + return begin, end + + def get_timeseries( ts_id: str, office_id: str, @@ -278,7 +293,7 @@ def get_timeseries( version_date: Optional[datetime] = None, trim: Optional[bool] = True, multithread: Optional[bool] = True, - max_workers: int = 15, + max_workers: int = 20, max_days_per_chunk: int = 14, ) -> Data: """Retrieves time series values from a specified time series and time window. Value date-times @@ -320,7 +335,7 @@ def get_timeseries( Specifies whether to trim missing values from the beginning and end of the retrieved values. multithread: boolean, optional, default is True Specifies whether to trim missing values from the beginning and end of the retrieved values. - max_workers: integer, default is 15 + max_workers: integer The maximum number of worker threads that will be spawned for multithreading max_days_per_chunk: integer, default is 14 The maximum number of days that would be included in a thread @@ -329,11 +344,9 @@ def get_timeseries( cwms data type. data.json will return the JSON output and data.df will return a dataframe. dates are all in UTC """ - # creates the dataframe from the timeseries data endpoint = "timeseries" - # Ensure `begin` and `end` are valid datetime objects - begin = begin or datetime.now() - timedelta(days=1) # Default to 24 hours ago - end = end or datetime.now() + selector = "values" + params = { "office": office_id, "name": ts_id, @@ -347,6 +360,29 @@ def get_timeseries( "trim": trim, } + begin, end = validate_dates(begin=begin, end=end) + + # grab extents if begin is before CWMS DB were implemented to prevent empty queries outside of extents + if begin < datetime(2014, 1, 1, tzinfo=timezone.utc) and multithread: + try: + begin_extent, _, _ = get_ts_extents(ts_id=ts_id, office_id=office_id) + # replace begin with begin extent if outside extents + if begin < begin_extent: + begin = begin_extent + print( + f"INFO: Requested begin was before any data in this timeseries. Reseting to {begin}" + ) + except Exception as e: + # If getting extents fails, fall back to single-threaded mode + print( + f"WARNING: Could not retrieve time series extents ({e}). Falling back to single-threaded mode." + ) + + response = api.get_with_paging( + selector=selector, endpoint="timeseries", params=params + ) + return Data(response, selector=selector) + # divide the time range into chunks chunks = chunk_timeseries_time_range(begin, end, timedelta(days=max_days_per_chunk)) @@ -355,7 +391,6 @@ def get_timeseries( # if not multithread if max_workers == 1 or not multithread: - selector = "values" response = api.get_with_paging( selector=selector, endpoint="timeseries", params=params ) @@ -580,7 +615,7 @@ def store_timeseries( store_rule: Optional[str] = None, override_protection: Optional[bool] = False, multithread: Optional[bool] = True, - max_workers: int = 30, + max_workers: int = 20, chunk_size: int = 2 * 7 * 24 * 4, # two weeks of 15 min data ) -> None: """Will Create new TimeSeries if not already present. Will store any data provided From e110b04426a01f05728546e5bec25e4ce5815df6 Mon Sep 17 00:00:00 2001 From: msweier Date: Tue, 30 Sep 2025 10:14:11 -0500 Subject: [PATCH 06/15] add chunk multi tests --- tests/cda/timeseries/timeseries_CDA_test.py | 95 +++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index 97c3882e..642831b5 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -1,6 +1,8 @@ from datetime import datetime, timedelta, timezone +from unittest.mock import patch import pandas as pd +import pandas.testing as pdt import pytest import cwms @@ -11,6 +13,7 @@ TEST_TSID = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Test" TEST_TSID_MULTI = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi" TEST_TSID_STORE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Store" +TEST_TSID_CHUNK_MULTI = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Chunk" TEST_TSID_DELETE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Delete" @@ -131,6 +134,98 @@ def test_get_multi_timeseries_df(): ), f"{ts_id_rev_test} not found in DataFrame columns" +def test_store_timeseries_multi_chunk_ts(): + # Define parameters + start_date = datetime(2025, 9, 8, 0, 0, tzinfo=timezone.utc) # Start date + end_date = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc) # End date + ts_id = TEST_TSID_CHUNK_MULTI + office = TEST_OFFICE + units = "ft" + + # Generate 15-minute interval timestamps + dt = pd.date_range(start=start_date, end=end_date, freq="15T", tz="UTC") + + # Generate random values and quality codes + values = [86.57 + (i % 10) * 0.01 for i in range(len(dt))] + quality_codes = [0] * len(dt) + + # Create DataFrame + df = pd.DataFrame( + { + "date-time": dt, + "value": values, + "quality-code": quality_codes, + } + ) + + # Convert DataFrame to JSON format + ts_json = ts.timeseries_df_to_json(df, ts_id, units, office) + + # Capture the log output + with patch("builtins.print") as mock_print: + ts.store_timeseries( + ts_json, multithread=True, max_workers=2, chunk_size=2 * 7 * 24 * 4 + ) + + # Extract the log messages + log_messages = [call.args[0] for call in mock_print.call_args_list] + + # Find the relevant log message + store_log = next((msg for msg in log_messages if "INFO: Storing" in msg), None) + assert store_log is not None, "Expected log message not found" + + # Parse the number of chunks and threads + chunks = int(store_log.split("chunks")[0].split()[-1]) + threads = int(store_log.split("with")[1].split()[0]) + + # Assert the expected values + assert chunks == 2, f"Expected 2 chunks, but got {chunks}" + assert threads == 2, f"Expected 2 threads, but got {threads}" + + +def test_read_timeseries_multi_chunk_ts(): + # Define parameters + start_date = datetime(2025, 9, 8, 0, 0, tzinfo=timezone.utc) # Start date + end_date = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc) # End date + ts_id = TEST_TSID_CHUNK_MULTI + office = TEST_OFFICE + + # Capture the log output + with patch("builtins.print") as mock_print: + data_multithread = ts.get_timeseries( + ts_id=TEST_TSID_CHUNK_MULTI, + office_id=TEST_OFFICE, + begin=start_date, + end=end_date, + multithread=True, + max_workers=2, + max_days_per_chunk=14, + ) + + # Extract the log messages + log_messages = [call.args[0] for call in mock_print.call_args_list] + + # Find the relevant log message + read_log = next((msg for msg in log_messages if "INFO: Fetching" in msg), None) + assert read_log is not None, "Expected log message not found" + + # Parse the number of chunks and threads + chunks = int(read_log.split("chunks")[0].split()[-1]) + threads = int(read_log.split("with")[1].split()[0]) + + # Assert the expected values + assert chunks == 2, f"Expected 2 chunks, but got {chunks}" + assert threads == 2, f"Expected 2 threads, but got {threads}" + + # Check metadata for multithreaded read + data_json = data_multithread.json + + # Check metadata + assert data_json["name"] == TEST_TSID_CHUNK_MULTI + assert data_json["office-id"] == TEST_OFFICE + assert data_json["units"] == "ft" + + def test_delete_timeseries(): ts_id_rev_test = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") now = datetime.now(timezone.utc).replace(microsecond=0) From 0a0b53cf9d0058edd2c4ac880c00301814fc5dc9 Mon Sep 17 00:00:00 2001 From: msweier Date: Tue, 30 Sep 2025 10:32:58 -0500 Subject: [PATCH 07/15] cleanup variables --- cwms/timeseries/timeseries.py | 4 ++-- tests/cda/timeseries/timeseries_CDA_test.py | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 9bbf41bb..71690b37 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -183,9 +183,10 @@ def fetch_timeseries_chunks( # Retrieve the result of the completed future result = future.result() results.append(result) - except Exception as e: + except Exception: # Log or handle any errors that occur during execution chunk_start, chunk_end = future_to_chunk[future] + print(f'ERROR: Failed to fetch data from {chunk_start} to {chunk_end}') return results @@ -344,7 +345,6 @@ def get_timeseries( cwms data type. data.json will return the JSON output and data.df will return a dataframe. dates are all in UTC """ - endpoint = "timeseries" selector = "values" params = { diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index 642831b5..22bc3c35 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -187,8 +187,6 @@ def test_read_timeseries_multi_chunk_ts(): # Define parameters start_date = datetime(2025, 9, 8, 0, 0, tzinfo=timezone.utc) # Start date end_date = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc) # End date - ts_id = TEST_TSID_CHUNK_MULTI - office = TEST_OFFICE # Capture the log output with patch("builtins.print") as mock_print: From b732cf73ec1c9b9f70fe6cfe9833f4e0ee232c21 Mon Sep 17 00:00:00 2001 From: msweier Date: Tue, 30 Sep 2025 10:44:19 -0500 Subject: [PATCH 08/15] format --- cwms/timeseries/timeseries.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 71690b37..89184230 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -183,10 +183,12 @@ def fetch_timeseries_chunks( # Retrieve the result of the completed future result = future.result() results.append(result) - except Exception: + except Exception as e: # Log or handle any errors that occur during execution chunk_start, chunk_end = future_to_chunk[future] - print(f'ERROR: Failed to fetch data from {chunk_start} to {chunk_end}') + print( + f"ERROR: Failed to fetch data from {chunk_start} to {chunk_end}: {e}" + ) return results From 75c6742a9706cbca832699bce92b4ec41d365caf Mon Sep 17 00:00:00 2001 From: msweier Date: Mon, 6 Oct 2025 11:19:17 -0500 Subject: [PATCH 09/15] correct thread notification --- cwms/timeseries/timeseries.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 89184230..a8948694 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -662,9 +662,10 @@ def store_timeseries( # if multi-threaded not needed if len(chunks) == 1 or not multithread: return api.post(endpoint, data, params) - + + actual_workers = min(max_workers, len(chunks)) print( - f"INFO: Storing {len(chunks)} chunks of timeseries data with {max_workers} threads" + f"INFO: Storing {len(chunks)} chunks of timeseries data with {actual_workers} threads" ) # Store chunks concurrently From b8502a06122b8bc7768fb5fd1e2d95e23a14cf8b Mon Sep 17 00:00:00 2001 From: msweier Date: Mon, 6 Oct 2025 11:19:49 -0500 Subject: [PATCH 10/15] format --- cwms/timeseries/timeseries.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index a8948694..c7e80912 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -662,7 +662,7 @@ def store_timeseries( # if multi-threaded not needed if len(chunks) == 1 or not multithread: return api.post(endpoint, data, params) - + actual_workers = min(max_workers, len(chunks)) print( f"INFO: Storing {len(chunks)} chunks of timeseries data with {actual_workers} threads" From f5e735a903558f09d1c81c139161a8f0462dc6d6 Mon Sep 17 00:00:00 2001 From: msweier Date: Mon, 6 Oct 2025 11:20:08 -0500 Subject: [PATCH 11/15] add value checking --- tests/cda/timeseries/timeseries_CDA_test.py | 53 +++++++++++++-------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index 22bc3c35..e1e3bb4a 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -15,6 +15,14 @@ TEST_TSID_STORE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Store" TEST_TSID_CHUNK_MULTI = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Chunk" TEST_TSID_DELETE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Delete" +START_DATE_CHUNK_MULTI = datetime(2025, 7, 31, 0, 0, tzinfo=timezone.utc) # Start date +END_DATE_CHUNK_MULTI = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc) # End date +TEST_DT_VALUES_CHUNK_MULTI = [ + (datetime(2025, 8, 1, 15, 15, tzinfo=timezone.utc), 3.14159), + (datetime(2025, 8, 20, 20, 00, tzinfo=timezone.utc), 3.14159 * 2), + (datetime(2025, 9, 15, 5, 15, tzinfo=timezone.utc), 3.14159 * 3), + (datetime(2025, 9, 30, 6, 45, tzinfo=timezone.utc), 3.14159 * 4), +] @pytest.fixture(scope="module", autouse=True) @@ -136,14 +144,14 @@ def test_get_multi_timeseries_df(): def test_store_timeseries_multi_chunk_ts(): # Define parameters - start_date = datetime(2025, 9, 8, 0, 0, tzinfo=timezone.utc) # Start date - end_date = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc) # End date ts_id = TEST_TSID_CHUNK_MULTI office = TEST_OFFICE - units = "ft" + units = "m" # Generate 15-minute interval timestamps - dt = pd.date_range(start=start_date, end=end_date, freq="15T", tz="UTC") + dt = pd.date_range( + start=START_DATE_CHUNK_MULTI, end=END_DATE_CHUNK_MULTI, freq="15T", tz="UTC" + ) # Generate random values and quality codes values = [86.57 + (i % 10) * 0.01 for i in range(len(dt))] @@ -158,14 +166,16 @@ def test_store_timeseries_multi_chunk_ts(): } ) + # assign specific values in different chunks for testing + for dt, value in TEST_DT_VALUES_CHUNK_MULTI: + df.loc[df["date-time"] == dt, "value"] = value + # Convert DataFrame to JSON format ts_json = ts.timeseries_df_to_json(df, ts_id, units, office) # Capture the log output with patch("builtins.print") as mock_print: - ts.store_timeseries( - ts_json, multithread=True, max_workers=2, chunk_size=2 * 7 * 24 * 4 - ) + ts.store_timeseries(ts_json, multithread=True, chunk_size=2 * 7 * 24 * 4) # Extract the log messages log_messages = [call.args[0] for call in mock_print.call_args_list] @@ -179,25 +189,21 @@ def test_store_timeseries_multi_chunk_ts(): threads = int(store_log.split("with")[1].split()[0]) # Assert the expected values - assert chunks == 2, f"Expected 2 chunks, but got {chunks}" - assert threads == 2, f"Expected 2 threads, but got {threads}" + assert chunks == 5, f"Expected 5 chunks, but got {chunks}" + assert threads == 5, f"Expected 5 threads, but got {threads}" def test_read_timeseries_multi_chunk_ts(): - # Define parameters - start_date = datetime(2025, 9, 8, 0, 0, tzinfo=timezone.utc) # Start date - end_date = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc) # End date # Capture the log output with patch("builtins.print") as mock_print: data_multithread = ts.get_timeseries( ts_id=TEST_TSID_CHUNK_MULTI, office_id=TEST_OFFICE, - begin=start_date, - end=end_date, - multithread=True, - max_workers=2, + begin=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, max_days_per_chunk=14, + unit="SI", ) # Extract the log messages @@ -212,16 +218,25 @@ def test_read_timeseries_multi_chunk_ts(): threads = int(read_log.split("with")[1].split()[0]) # Assert the expected values - assert chunks == 2, f"Expected 2 chunks, but got {chunks}" - assert threads == 2, f"Expected 2 threads, but got {threads}" + assert chunks == 5, f"Expected 5 chunks, but got {chunks}" + assert threads == 5, f"Expected 5 threads, but got {threads}" # Check metadata for multithreaded read data_json = data_multithread.json + # check values + df = data_multithread.df.copy() + # assign specific values in different chunks for testing + for dt, value in TEST_DT_VALUES_CHUNK_MULTI: + test_value = df.loc[df["date-time"] == dt, "value"].values[0] + assert ( + test_value == value + ), f"Expected value at {dt} to equal {value}, but got {test_value}" + # Check metadata assert data_json["name"] == TEST_TSID_CHUNK_MULTI assert data_json["office-id"] == TEST_OFFICE - assert data_json["units"] == "ft" + assert data_json["units"] == "m" def test_delete_timeseries(): From 037fa00e53797626a27adaad7fd01d7edaf22f3e Mon Sep 17 00:00:00 2001 From: msweier Date: Mon, 6 Oct 2025 11:46:58 -0500 Subject: [PATCH 12/15] update freq --- tests/cda/timeseries/timeseries_CDA_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index e1e3bb4a..4d4d409a 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -150,7 +150,7 @@ def test_store_timeseries_multi_chunk_ts(): # Generate 15-minute interval timestamps dt = pd.date_range( - start=START_DATE_CHUNK_MULTI, end=END_DATE_CHUNK_MULTI, freq="15T", tz="UTC" + start=START_DATE_CHUNK_MULTI, end=END_DATE_CHUNK_MULTI, freq="15min", tz="UTC" ) # Generate random values and quality codes From becbe69395078bf96b16cf00dbc8346d3971d5c8 Mon Sep 17 00:00:00 2001 From: msweier Date: Mon, 6 Oct 2025 13:43:25 -0500 Subject: [PATCH 13/15] update documentation --- cwms/timeseries/timeseries.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index c7e80912..4d69a447 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -338,10 +338,10 @@ def get_timeseries( Specifies whether to trim missing values from the beginning and end of the retrieved values. multithread: boolean, optional, default is True Specifies whether to trim missing values from the beginning and end of the retrieved values. - max_workers: integer - The maximum number of worker threads that will be spawned for multithreading + max_workers: integer, default is 20 + The maximum number of worker threads that will be spawned for multithreading, If calling more than 3 years of 15 minute data, consider using 30 max_workers max_days_per_chunk: integer, default is 14 - The maximum number of days that would be included in a thread + The maximum number of days that would be included in a thread. If calling more than 1 year of 15 minute data, consider using 30 days Returns ------- cwms data type. data.json will return the JSON output and data.df will return a dataframe. dates are all in UTC @@ -638,8 +638,11 @@ def store_timeseries( override_protection: str, optional, default is False A flag to ignore the protected data quality when storing data. multithread: bool, default is true - max_workers: int, maximum numbers of worker threads - chunk_size: int, maximum values that will be saved by a thread + max_workers: int, default is 20, maximum numbers of worker threads, + if saving more than 3 years of 15 minute data, consider using 30 + chunk_size: int, default is 2 * 7 * 24 * 4 (two weeks of 15 minute data), + maximum values that will be saved by a thread, if saving more than 3 years of 15 minute data, + consider using 30 days of 15 minute data Returns ------- From 70fc788225231a6c21b5cfc2d9d5dce9a7d57e72 Mon Sep 17 00:00:00 2001 From: msweier Date: Mon, 6 Oct 2025 13:44:49 -0500 Subject: [PATCH 14/15] add num of values and non null check --- tests/cda/timeseries/timeseries_CDA_test.py | 27 +++++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index 4d4d409a..19e5ffb5 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -23,6 +23,11 @@ (datetime(2025, 9, 15, 5, 15, tzinfo=timezone.utc), 3.14159 * 3), (datetime(2025, 9, 30, 6, 45, tzinfo=timezone.utc), 3.14159 * 4), ] +# Generate 15-minute interval timestamps +DT_CHUNK_MULTI = pd.date_range( + start=START_DATE_CHUNK_MULTI, end=END_DATE_CHUNK_MULTI, freq="15min", tz="UTC" +) +NUMBER_OF_VAL_CHUNK_MULTI = len(DT_CHUNK_MULTI) @pytest.fixture(scope="module", autouse=True) @@ -148,19 +153,14 @@ def test_store_timeseries_multi_chunk_ts(): office = TEST_OFFICE units = "m" - # Generate 15-minute interval timestamps - dt = pd.date_range( - start=START_DATE_CHUNK_MULTI, end=END_DATE_CHUNK_MULTI, freq="15min", tz="UTC" - ) - # Generate random values and quality codes - values = [86.57 + (i % 10) * 0.01 for i in range(len(dt))] - quality_codes = [0] * len(dt) + values = [86.57 + (i % 10) * 0.01 for i in range(NUMBER_OF_VAL_CHUNK_MULTI)] + quality_codes = [0] * NUMBER_OF_VAL_CHUNK_MULTI # Create DataFrame df = pd.DataFrame( { - "date-time": dt, + "date-time": DT_CHUNK_MULTI, "value": values, "quality-code": quality_codes, } @@ -226,6 +226,17 @@ def test_read_timeseries_multi_chunk_ts(): # check values df = data_multithread.df.copy() + + # check length of data + assert ( + len(df) == NUMBER_OF_VAL_CHUNK_MULTI + ), f"Expected {NUMBER_OF_VAL_CHUNK_MULTI} values, but got {len(df)}" + # check to make sure no nan were stored + df_cleaned = df.dropna(subset=["value"]) + assert ( + len(df_cleaned) == NUMBER_OF_VAL_CHUNK_MULTI + ), f"Expected {NUMBER_OF_VAL_CHUNK_MULTI} non-null values, but got {len(df_cleaned)}" + # assign specific values in different chunks for testing for dt, value in TEST_DT_VALUES_CHUNK_MULTI: test_value = df.loc[df["date-time"] == dt, "value"].values[0] From e90b7d2a0a20098557dce3d52224b9b7550e1812 Mon Sep 17 00:00:00 2001 From: msweier Date: Mon, 6 Oct 2025 14:46:41 -0500 Subject: [PATCH 15/15] compare entire df with read/write --- tests/cda/timeseries/timeseries_CDA_test.py | 65 +++++++-------------- 1 file changed, 20 insertions(+), 45 deletions(-) diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index 19e5ffb5..0884c7aa 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -15,19 +15,23 @@ TEST_TSID_STORE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Store" TEST_TSID_CHUNK_MULTI = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Chunk" TEST_TSID_DELETE = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Delete" -START_DATE_CHUNK_MULTI = datetime(2025, 7, 31, 0, 0, tzinfo=timezone.utc) # Start date -END_DATE_CHUNK_MULTI = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc) # End date -TEST_DT_VALUES_CHUNK_MULTI = [ - (datetime(2025, 8, 1, 15, 15, tzinfo=timezone.utc), 3.14159), - (datetime(2025, 8, 20, 20, 00, tzinfo=timezone.utc), 3.14159 * 2), - (datetime(2025, 9, 15, 5, 15, tzinfo=timezone.utc), 3.14159 * 3), - (datetime(2025, 9, 30, 6, 45, tzinfo=timezone.utc), 3.14159 * 4), -] # Generate 15-minute interval timestamps +START_DATE_CHUNK_MULTI = datetime(2025, 7, 31, 0, 0, tzinfo=timezone.utc) +END_DATE_CHUNK_MULTI = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc) DT_CHUNK_MULTI = pd.date_range( - start=START_DATE_CHUNK_MULTI, end=END_DATE_CHUNK_MULTI, freq="15min", tz="UTC" + start=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + freq="15min", + tz="UTC", +) +# Create DataFrame +DF_CHUNK_MULTI = pd.DataFrame( + { + "date-time": DT_CHUNK_MULTI, + "value": [86.57 + (i % 10) * 0.01 for i in range(len(DT_CHUNK_MULTI))], + "quality-code": [0] * len(DT_CHUNK_MULTI), + } ) -NUMBER_OF_VAL_CHUNK_MULTI = len(DT_CHUNK_MULTI) @pytest.fixture(scope="module", autouse=True) @@ -153,25 +157,8 @@ def test_store_timeseries_multi_chunk_ts(): office = TEST_OFFICE units = "m" - # Generate random values and quality codes - values = [86.57 + (i % 10) * 0.01 for i in range(NUMBER_OF_VAL_CHUNK_MULTI)] - quality_codes = [0] * NUMBER_OF_VAL_CHUNK_MULTI - - # Create DataFrame - df = pd.DataFrame( - { - "date-time": DT_CHUNK_MULTI, - "value": values, - "quality-code": quality_codes, - } - ) - - # assign specific values in different chunks for testing - for dt, value in TEST_DT_VALUES_CHUNK_MULTI: - df.loc[df["date-time"] == dt, "value"] = value - # Convert DataFrame to JSON format - ts_json = ts.timeseries_df_to_json(df, ts_id, units, office) + ts_json = ts.timeseries_df_to_json(DF_CHUNK_MULTI, ts_id, units, office) # Capture the log output with patch("builtins.print") as mock_print: @@ -224,25 +211,13 @@ def test_read_timeseries_multi_chunk_ts(): # Check metadata for multithreaded read data_json = data_multithread.json - # check values + # check df values df = data_multithread.df.copy() - # check length of data - assert ( - len(df) == NUMBER_OF_VAL_CHUNK_MULTI - ), f"Expected {NUMBER_OF_VAL_CHUNK_MULTI} values, but got {len(df)}" - # check to make sure no nan were stored - df_cleaned = df.dropna(subset=["value"]) - assert ( - len(df_cleaned) == NUMBER_OF_VAL_CHUNK_MULTI - ), f"Expected {NUMBER_OF_VAL_CHUNK_MULTI} non-null values, but got {len(df_cleaned)}" - - # assign specific values in different chunks for testing - for dt, value in TEST_DT_VALUES_CHUNK_MULTI: - test_value = df.loc[df["date-time"] == dt, "value"].values[0] - assert ( - test_value == value - ), f"Expected value at {dt} to equal {value}, but got {test_value}" + # make sure the dataframe matches stored dataframe + pdt.assert_frame_equal( + df, DF_CHUNK_MULTI + ), f"Data frames do not match: original = {DF_CHUNK_MULTI.describe()}, stored = {df.describe()}" # Check metadata assert data_json["name"] == TEST_TSID_CHUNK_MULTI