From 43a6856725e6e579c9284ea475ae8bca1fb6e5d0 Mon Sep 17 00:00:00 2001 From: BaptisteDE Date: Mon, 13 Jan 2025 11:48:13 +0100 Subject: [PATCH 1/6] =?UTF-8?q?=E2=9C=A8get=5Finflux=5Fdata,=20push=5Finfl?= =?UTF-8?q?ux=5Fdata?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- setup.py | 1 + tide/influx.py | 82 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 tide/influx.py diff --git a/setup.py b/setup.py index 77b1f2e..d2eb995 100644 --- a/setup.py +++ b/setup.py @@ -39,6 +39,7 @@ "matplotlib>=3.5.1", "plotly>=5.3.1", "requests>=2.32.3", + "influxdb-client>=1.48.0", ], packages=find_packages(exclude=["tests*"]), include_package_data=True, diff --git a/tide/influx.py b/tide/influx.py new file mode 100644 index 0000000..08e3e5e --- /dev/null +++ b/tide/influx.py @@ -0,0 +1,82 @@ +import datetime as dt + +import pandas as pd +from influxdb_client import InfluxDBClient + +from tide.utils import check_and_return_dt_index_df + + +def date_objects_tostring(date): + if isinstance(date, dt.datetime) or isinstance(date, pd.Timestamp): + return date.strftime("%Y-%m-%d %H:%M") + else: + return date + + +def get_influx_data( + start: str | pd.Timestamp | dt.datetime, + stop: str | pd.Timestamp | dt.datetime, + bucket: str, + measurement: str, + tide_tags: list[str], + url: str, + org: str, + token: str, +): + client = InfluxDBClient(url=url, org=org, token=token) + + query_api = client.query_api() + query = f""" + from(bucket: {bucket}) + |> range(start: {date_objects_tostring(start)}, stop: {date_objects_tostring(stop)}) + |> filter(fn: (r) => r["_measurement"] == {measurement}) + |> map(fn: (r) => ({{r with tide: r.{tide_tags[0]} + "__" + r.{tide_tags[1]} + "__" + r.{tide_tags[2]} + "__" + r.{tide_tags[3]}}})) + |> keep(columns: ["_time", "_value", "tide"]) + |> pivot(rowKey: ["_time"], columnKey: ["tide"], valueColumn: "_value") + |> sort(columns: ["_time"]) + """ + + tables = query_api.query(query) + + records = [] + for table in tables: + for record in table.records: + records.append(record.values) + + df = pd.DataFrame(records) + df["_time"] = pd.to_datetime(df["_time"]) + df.set_index("_time", inplace=True) + df.drop(["result", "table"], axis=1, inplace=True) + + return df + + +def push_influx_data( + data: pd.DataFrame, + tide_tags: list[str], + bucket: str, + url: str, + org: str, + token: str, + measurement: str = "tide", +): + data = check_and_return_dt_index_df(data) + influx_df_list = [] + for time, row in data.iterrows(): + df = row.reset_index() + df.columns = ["full_index", "_value"] + df[tide_tags] = df["full_index"].str.split("__", expand=True) + df = df.drop("full_index", axis=1) + df.index = pd.to_datetime([time] * df.shape[0]) + influx_df_list.append(df) + + influx_df = pd.concat(influx_df_list).dropna() + + with InfluxDBClient(url=url, token=token, org=org) as client: + with client.write_api() as write_client: + write_client.write( + bucket=bucket, + record=influx_df, + data_frame_measurement_name=measurement, + data_frame_tag_columns=tide_tags, + ) From 4ff05d7800b250f927d476261d1d25ff224880ca Mon Sep 17 00:00:00 2001 From: BaptisteDE Date: Mon, 13 Jan 2025 16:46:40 +0100 Subject: [PATCH 2/6] =?UTF-8?q?=F0=9F=90=9B=20date=20with=20timezone?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tide/influx.py | 120 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 108 insertions(+), 12 deletions(-) diff --git a/tide/influx.py b/tide/influx.py index 08e3e5e..184ff58 100644 --- a/tide/influx.py +++ b/tide/influx.py @@ -1,4 +1,5 @@ import datetime as dt +from zoneinfo import ZoneInfo import pandas as pd from influxdb_client import InfluxDBClient @@ -6,14 +7,17 @@ from tide.utils import check_and_return_dt_index_df -def date_objects_tostring(date): - if isinstance(date, dt.datetime) or isinstance(date, pd.Timestamp): - return date.strftime("%Y-%m-%d %H:%M") - else: - return date +def _date_objects_tostring(date: dt.datetime | pd.Timestamp, tz_info=None): + if date.tzinfo is None: + if tz_info is None: + raise ValueError("tz_info must be provided for naive datetime objects.") + date = date.replace(tzinfo=ZoneInfo(tz_info)) + date_utc = date.astimezone(ZoneInfo("UTC")) + return date_utc.strftime("%Y-%m-%dT%H:%M:%SZ") -def get_influx_data( + +def _single_influx_request( start: str | pd.Timestamp | dt.datetime, stop: str | pd.Timestamp | dt.datetime, bucket: str, @@ -22,14 +26,14 @@ def get_influx_data( url: str, org: str, token: str, -): + tz_info: str = "UTC", +) -> pd.DataFrame: client = InfluxDBClient(url=url, org=org, token=token) - query_api = client.query_api() query = f""" - from(bucket: {bucket}) - |> range(start: {date_objects_tostring(start)}, stop: {date_objects_tostring(stop)}) - |> filter(fn: (r) => r["_measurement"] == {measurement}) + from(bucket: "{bucket}") + |> range(start: {_date_objects_tostring(start, tz_info)}, stop: {_date_objects_tostring(stop, tz_info)}) + |> filter(fn: (r) => r["_measurement"] == "{measurement}") |> map(fn: (r) => ({{r with tide: r.{tide_tags[0]} + "__" + r.{tide_tags[1]} + "__" + r.{tide_tags[2]} + "__" + r.{tide_tags[3]}}})) |> keep(columns: ["_time", "_value", "tide"]) |> pivot(rowKey: ["_time"], columnKey: ["tide"], valueColumn: "_value") @@ -47,10 +51,102 @@ def get_influx_data( df["_time"] = pd.to_datetime(df["_time"]) df.set_index("_time", inplace=True) df.drop(["result", "table"], axis=1, inplace=True) - return df +def get_influx_data( + start: str | pd.Timestamp | dt.datetime, + stop: str | pd.Timestamp | dt.datetime, + bucket: str, + measurement: str, + tide_tags: list[str], + url: str, + org: str, + token: str, + split_td: str | dt.timedelta | pd.Timedelta = None, + tz_info: str = "UTC", + verbose: bool = False, +) -> pd.DataFrame: + + """ + Fetches data from an InfluxDB instance for the specified time range, + bucket, and measurement, optionally splitting the request into smaller time + intervals. + + Parameters + ---------- + start : str, pd.Timestamp, or datetime.datetime + The start of the time range for the query. Can be: + - A relative time string (e.g., "-1d", "-2h"). + - A `pd.Timestamp` or `datetime.datetime` object. + + stop : str, pd.Timestamp, or datetime.datetime + The end of the time range for the query. + Accepts the same formats as `start`. + + bucket : str + The name of the InfluxDB bucket to query data from. + + measurement : str + The _measurement name within the InfluxDB bucket to filter data. + + tide_tags : list[str] + A list of fields or tags in Influx that correspond to Tide tags. + Must be specified in the following order name__unit__bloc__sub_bloc. + + url : str + The URL of the InfluxDB instance (e.g., "http://localhost:8086"). + + org : str + The organization name in the InfluxDB instance. + + token : str + The authentication token for accessing the InfluxDB instance. + + split_td : str, datetime.timedelta, or pd.Timedelta, optional + The time interval for splitting the query into smaller chunks + (e.g., "1d", "12h"). If `None`, the query will not be split. + + tz_info : str, optional + The timezone for interpreting the start and stop times. + Defaults to "UTC". + + verbose : bool, optional + If `True`, prints progress messages for each time chunk being fetched. + Defaults to `False`. + + """ + + if isinstance(start, str) and isinstance(stop, str): + start = dt.datetime.now() + pd.Timedelta(start) + stop = dt.datetime.now() + pd.Timedelta(stop) + + if split_td is not None: + dates_index = pd.date_range(start, stop, freq=split_td) + else: + dates_index = pd.Index([start, stop]) + + df_list = [] + for i in range(len(dates_index) - 1): + if verbose: + print(f"Getting period {i + 1} / {len(dates_index) - 1}") + df_list.append( + _single_influx_request( + start=dates_index[i], + stop=dates_index[i + 1], + bucket=bucket, + measurement=measurement, + tide_tags=tide_tags, + url=url, + org=org, + token=token, + tz_info=tz_info, + ) + ) + + return df_list[0] if len(df_list) == 1 else pd.concat(df_list) + + def push_influx_data( data: pd.DataFrame, tide_tags: list[str], From 9e05ecc36904eb94653c4015f391d846576fcc63 Mon Sep 17 00:00:00 2001 From: BaptisteDE Date: Tue, 14 Jan 2025 09:52:04 +0100 Subject: [PATCH 3/6] =?UTF-8?q?=E2=9A=A1=EF=B8=8F=20adapt=20query=20to=20n?= =?UTF-8?q?umber=20of=20used=20tide=20tags?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tide/influx.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tide/influx.py b/tide/influx.py index 184ff58..005f7d3 100644 --- a/tide/influx.py +++ b/tide/influx.py @@ -32,13 +32,20 @@ def _single_influx_request( query_api = client.query_api() query = f""" from(bucket: "{bucket}") - |> range(start: {_date_objects_tostring(start, tz_info)}, stop: {_date_objects_tostring(stop, tz_info)}) + |> range(start: {_date_objects_tostring(start, tz_info)}, + stop: {_date_objects_tostring(stop, tz_info)}) |> filter(fn: (r) => r["_measurement"] == "{measurement}") - |> map(fn: (r) => ({{r with tide: r.{tide_tags[0]} + "__" + r.{tide_tags[1]} + "__" + r.{tide_tags[2]} + "__" + r.{tide_tags[3]}}})) + |> map(fn: (r) => ({{r with tide: r.{tide_tags[0]} + """ + if len(tide_tags) > 1: + for tag in tide_tags[1:]: + query += f' + "__" + r.{tag}' + query += "}))" + query += """ |> keep(columns: ["_time", "_value", "tide"]) |> pivot(rowKey: ["_time"], columnKey: ["tide"], valueColumn: "_value") |> sort(columns: ["_time"]) - """ + """ tables = query_api.query(query) @@ -67,7 +74,6 @@ def get_influx_data( tz_info: str = "UTC", verbose: bool = False, ) -> pd.DataFrame: - """ Fetches data from an InfluxDB instance for the specified time range, bucket, and measurement, optionally splitting the request into smaller time From c7a3dfbdbf3344ad25665c94653df37d9a3db165 Mon Sep 17 00:00:00 2001 From: BaptisteDE Date: Tue, 14 Jan 2025 10:35:03 +0100 Subject: [PATCH 4/6] =?UTF-8?q?=F0=9F=A5=85=20ReadTimeoutError=20+=20retry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tide/influx.py | 47 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/tide/influx.py b/tide/influx.py index 005f7d3..a2024e8 100644 --- a/tide/influx.py +++ b/tide/influx.py @@ -1,5 +1,7 @@ +import time import datetime as dt from zoneinfo import ZoneInfo +from urllib3.exceptions import ReadTimeoutError import pandas as pd from influxdb_client import InfluxDBClient @@ -72,6 +74,8 @@ def get_influx_data( token: str, split_td: str | dt.timedelta | pd.Timedelta = None, tz_info: str = "UTC", + max_retry: int = 5, + waited_seconds_at_retry: int = 5, verbose: bool = False, ) -> pd.DataFrame: """ @@ -121,6 +125,11 @@ def get_influx_data( If `True`, prints progress messages for each time chunk being fetched. Defaults to `False`. + max_retry: int, default 5 + Number of retries for a query in case of ReadTimeoutError. + + waited_seconds_at_retry: int default 5 + Number of seconds waited before re-sending the query """ if isinstance(start, str) and isinstance(stop, str): @@ -136,19 +145,31 @@ def get_influx_data( for i in range(len(dates_index) - 1): if verbose: print(f"Getting period {i + 1} / {len(dates_index) - 1}") - df_list.append( - _single_influx_request( - start=dates_index[i], - stop=dates_index[i + 1], - bucket=bucket, - measurement=measurement, - tide_tags=tide_tags, - url=url, - org=org, - token=token, - tz_info=tz_info, - ) - ) + for attempt in range(max_retry): + try: + df_list.append( + _single_influx_request( + start=dates_index[i], + stop=dates_index[i + 1], + bucket=bucket, + measurement=measurement, + tide_tags=tide_tags, + url=url, + org=org, + token=token, + tz_info=tz_info, + ) + ) + break + except ReadTimeoutError: + if attempt < max_retry - 1: + if verbose: + print(f"Attempt {attempt + 1} failed") + time.sleep(waited_seconds_at_retry) + else: + if verbose: + print("Max retries reached. Unable to get data.") + raise return df_list[0] if len(df_list) == 1 else pd.concat(df_list) From a54124ac5f722fe7833a7304c841add6e685d20f Mon Sep 17 00:00:00 2001 From: BaptisteDE Date: Tue, 14 Jan 2025 12:07:58 +0100 Subject: [PATCH 5/6] =?UTF-8?q?=E2=9C=85=20test=5Fget=5Finflux=5Fdata?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_influx.py | 89 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 tests/test_influx.py diff --git a/tests/test_influx.py b/tests/test_influx.py new file mode 100644 index 0000000..ebd6a9d --- /dev/null +++ b/tests/test_influx.py @@ -0,0 +1,89 @@ +import datetime as dt +from unittest.mock import MagicMock, patch + +import pandas as pd + +from tide.influx import get_influx_data + + +def mock_influx_client_query(query): + ref_query = """ + from(bucket: "my_bucket") + |> range(start: 2009-01-01T00:00:00Z, + stop: 2009-01-01T03:00:00Z) + |> filter(fn: (r) => r["_measurement"] == "log") + |> map(fn: (r) => ({r with tide: r.Name + + "__" + r.Unit + "__" + r.bloc})) + |> keep(columns: ["_time", "_value", "tide"]) + |> pivot(rowKey: ["_time"], columnKey: ["tide"], valueColumn: "_value") + |> sort(columns: ["_time"]) + """ + + assert query.strip().replace("\n", "").replace( + " ", "" + ) == ref_query.strip().replace("\n", "").replace(" ", "") + + return [ + MagicMock( + records=[ + MagicMock( + values={ + "_time": "2023-01-01T00:00:00Z", + "temp__°C__bloc1": 0.0, + "hr__%hr__bloc1": 10.0, + "table": 0, + "result": "_result", + } + ), + MagicMock( + values={ + "_time": "2023-01-01T02:00:00Z", + "temp__°C__bloc1": 1.0, + "hr__%hr__bloc1": 20.0, + "table": 0, + "result": "_result", + } + ), + MagicMock( + values={ + "_time": "2023-01-03T01:00:00Z", + "temp__°C__bloc1": 2.0, + "hr__%hr__bloc1": 30.0, + "table": 0, + "result": "_result", + } + ), + ] + ) + ] + + +URL = "https://influx_db.com:3000" +INFLUX_ORG = "my_org" +INFLUX_TOKEN = "my_tok" +TIDE_USED_TAGS = ["Name", "Unit", "bloc"] +BUCKET = "my_bucket" +MEASUREMENT = "log" +START = dt.datetime(2009, 1, 1, 0) +STOP = dt.datetime(2009, 1, 1, 3) + + +class TestInflux: + @patch("influxdb_client.InfluxDBClient.query_api") + def test_get_influx_data(self, mock_query_api): + mock_query_api.return_value.query = MagicMock( + side_effect=mock_influx_client_query + ) + + df = get_influx_data( + START, + STOP, + BUCKET, + MEASUREMENT, + TIDE_USED_TAGS, + URL, + INFLUX_ORG, + INFLUX_TOKEN, + ) + + assert isinstance(df, pd.DataFrame) From 2011e1396c66f22143549a7d5cf645ba0abaecdb Mon Sep 17 00:00:00 2001 From: BaptisteDE Date: Tue, 14 Jan 2025 16:57:41 +0100 Subject: [PATCH 6/6] =?UTF-8?q?=E2=9C=85=20push=5Finflux=5Fdata?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_influx.py | 47 ++++++++++++++++++++++++++++++++++++++- tide/influx.py | 53 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 97 insertions(+), 3 deletions(-) diff --git a/tests/test_influx.py b/tests/test_influx.py index ebd6a9d..01c1331 100644 --- a/tests/test_influx.py +++ b/tests/test_influx.py @@ -3,7 +3,7 @@ import pandas as pd -from tide.influx import get_influx_data +from tide.influx import get_influx_data, push_influx_data def mock_influx_client_query(query): @@ -87,3 +87,48 @@ def test_get_influx_data(self, mock_query_api): ) assert isinstance(df, pd.DataFrame) + + @patch("tide.influx.InfluxDBClient") + def test_push_influx_data(self, mock_influx_client): + mock_write_api = MagicMock() + mock_influx_client.return_value.__enter__.return_value.write_api.return_value.__enter__.return_value = mock_write_api + + data = pd.DataFrame( + { + "name1__°C__bloc1": [1.0, 2.0], + "name2__W__bloc1": [3.0, 4.0], + }, + index=pd.to_datetime(["2009-01-01T00:00:00Z", "2009-01-01T01:00:00Z"]), + ) + + push_influx_data( + data=data, + tide_tags=TIDE_USED_TAGS, + bucket=BUCKET, + url=URL, + org=INFLUX_ORG, + token=INFLUX_TOKEN, + measurement=MEASUREMENT, + ) + + # Assertions to verify interactions with the mock + mock_influx_client.assert_called_once_with( + url=URL, token=INFLUX_TOKEN, org=INFLUX_ORG + ) # Ensure write_api was used + + # Ensure InfluxDBClient was initialized + mock_influx_client.assert_called_once_with( + url="https://influx_db.com:3000", token="my_tok", org="my_org" + ) + + mock_write_api.write.assert_called_once() + call_args = mock_write_api.write.call_args[1] + assert call_args["bucket"] == BUCKET + assert call_args["data_frame_measurement_name"] == MEASUREMENT + assert call_args["data_frame_tag_columns"] == TIDE_USED_TAGS + + written_df = call_args["record"] + assert set(written_df.columns) == {"_value", "Name", "Unit", "bloc"} + assert written_df["_value"].tolist() == [1.0, 3.0, 2.0, 4.0] + assert written_df["Unit"].tolist() == ["°C", "W", "°C", "W"] + assert written_df["bloc"].tolist() == ["bloc1"] * 4 diff --git a/tide/influx.py b/tide/influx.py index a2024e8..bfc5b8e 100644 --- a/tide/influx.py +++ b/tide/influx.py @@ -183,14 +183,63 @@ def push_influx_data( token: str, measurement: str = "tide", ): + """ + Pushes data from a pandas DataFrame to an InfluxDB bucket. + + This function processes a DataFrame indexed by datetime and writes the data + to an InfluxDB bucket. Each row in the DataFrame is expanded based on Tide tags + extracted from a specific column and written to InfluxDB with corresponding + timestamp and tag values. + + Parameters: + data (pd.DataFrame): Input DataFrame with a datetime index and + one or more columns of values. + + tide_tags (list[str]): List of tag names to extract from the + "full_index" column after splitting it. For exemple : ["Name", "Unit", + "bloc", "sub_bloc" + + bucket (str): InfluxDB bucket name where the data will be written. + + url (str): URL of the InfluxDB instance. + + org (str): InfluxDB organization name. + + token (str): Authentication token for the InfluxDB instance. + + measurement (str, optional): Name of the measurement to use in InfluxDB. + Defaults to "tide". + + Raises: + ValueError: If the input `data` is not a DataFrame with a datetime index. + + Example: + >>> data = pd.DataFrame( + { + "name1__°C__bloc1": [1.0, 2.0], + "name2__W__bloc1": [3.0, 4.0], + }, + index=pd.to_datetime(["2009-01-01T00:00:00Z", "2009-01-01T01:00:00Z"]), + ) + + >>> push_influx_data( + data=data, + tide_tags=['Name', 'Unit', "bloc"], + bucket='my-bucket', + url='http://localhost:8086', + org='my-org', + token='my-token' + ) + """ + data = check_and_return_dt_index_df(data) influx_df_list = [] - for time, row in data.iterrows(): + for t, row in data.iterrows(): df = row.reset_index() df.columns = ["full_index", "_value"] df[tide_tags] = df["full_index"].str.split("__", expand=True) df = df.drop("full_index", axis=1) - df.index = pd.to_datetime([time] * df.shape[0]) + df.index = pd.to_datetime([t] * df.shape[0]) influx_df_list.append(df) influx_df = pd.concat(influx_df_list).dropna()