diff --git a/tests/test_plumbing.py b/tests/test_plumbing.py index 8e1167e..079e818 100644 --- a/tests/test_plumbing.py +++ b/tests/test_plumbing.py @@ -1,7 +1,6 @@ import pandas as pd - import numpy as np - +import pytest from tide.plumbing import ( _get_pipe_from_proc_list, _get_column_wise_transformer, @@ -13,127 +12,182 @@ pio.renderers.default = "browser" -TEST_DF = pd.DataFrame( - { - "Tin__°C__building": [10.0, 20.0, 30.0], - "Text__°C__outdoor": [-1.0, 5.0, 4.0], - "radiation__W/m2__outdoor": [50, 100, 400], - "Humidity__%HR": [10, 15, 13], - "Humidity__%HR__room1": [20, 30, 50], - "Humidity_2": [10, 15, 13], - "light__DIMENSIONLESS__building": [100, 200, 300], - "mass_flwr__m3/h__hvac": [300, 500, 600], - }, - index=pd.date_range("2009", freq="h", periods=3, tz="UTC"), -) -TEST_DF_2 = pd.DataFrame( - { - "a__°C__zone_1": np.random.randn(24), - "b__°C__zone_1": np.random.randn(24), - "c__Wh__zone_2": np.random.randn(24) * 100, - }, - index=pd.date_range("2009", freq="h", periods=24, tz="UTC"), -) +@pytest.fixture +def time_index(): + """Create a standard time index for test data.""" + return pd.date_range("2009-01-01", freq="h", periods=24, tz="UTC") -TEST_DF_2["c__Wh__zone_2"] = abs(TEST_DF_2).cumsum()["c__Wh__zone_2"] - -TEST_DF_2.loc["2009-01-01 05:00:00":"2009-01-01 09:00:00", "a__°C__zone_1"] = np.nan -TEST_DF_2.loc["2009-01-01 15:00:00", "b__°C__zone_1"] = np.nan -TEST_DF_2.loc["2009-01-01 17:00:00", "b__°C__zone_1"] = np.nan -TEST_DF_2.loc["2009-01-01 20:00:00", "c__Wh__zone_2"] = np.nan - -PIPE_DICT = { - "pre_processing": { - "°C": [["ReplaceThreshold", {"upper": 25}]], - "W/m2__outdoor": [["DropTimeGradient", {"upper_rate": -100}]], - }, - "common": [["Interpolate", ["linear"]], ["Ffill"], ["Bfill", {"limit": 3}]], - "resampling": [["Resample", ["3h", "mean", {"W/m2": "sum"}]]], - "compute_energy": [ - [ - "ExpressionCombine", - [ - { - "T1": "Tin__°C__building", - "T2": "Text__°C__outdoor", - "m": "mass_flwr__m3/h__hvac", - }, - "(T1 - T2) * m * 1004 * 1.204", - "Air_flow_energy__hvac__J", - True, - ], - ] - ], -} +@pytest.fixture +def basic_data(time_index): + """Create basic test data with various units and tags.""" + return pd.DataFrame( + { + "Tin__°C__building": np.random.randn(24) * 5 + 20, + "Text__°C__outdoor": np.random.randn(24) * 3 + 10, + "radiation__W/m2__outdoor": np.abs(np.random.randn(24)) * 100, + "Humidity__%HR": np.random.randn(24) * 5 + 50, + "Humidity__%HR__room1": np.random.randn(24) * 5 + 45, + "Humidity_2": np.random.randn(24) * 5 + 55, + "light__DIMENSIONLESS__building": np.abs(np.random.randn(24)) * 200, + "mass_flwr__m3/h__hvac": np.abs(np.random.randn(24)) * 400 + 500, + }, + index=time_index, + ) -class TestPlumbing: - def test__get_all_data_step(self): - test_df = TEST_DF.copy() - test_df.iloc[1, 0] = np.nan - test_df.iloc[0, 1] = np.nan - pipe = _get_pipe_from_proc_list(test_df.columns, PIPE_DICT["common"], tz="UTC") - res = pipe.fit_transform(test_df) +@pytest.fixture +def gapped_data(time_index): + """Create test data with specific gaps for testing gap-related functionality.""" + data = pd.DataFrame( + { + "a__°C__zone_1": np.random.randn(24), + "b__°C__zone_1": np.random.randn(24), + "c__Wh__zone_2": np.abs(np.random.randn(24) * 100), + }, + index=time_index, + ) - pd.testing.assert_series_equal( - res["Tin__°C__building"], TEST_DF["Tin__°C__building"] - ) - assert float(res.iloc[0, 1]) == 5.0 + # Add cumulative sum to energy data + data["c__Wh__zone_2"] = data["c__Wh__zone_2"].cumsum() - def test__get_column_wise_transformer(self): - col_trans = _get_column_wise_transformer( - proc_dict=PIPE_DICT["pre_processing"], - data_columns=TEST_DF.columns, - tz="UTC", - process_name="test", + # Add specific gaps + data.loc["2009-01-01 05:00":"2009-01-01 09:00", "a__°C__zone_1"] = np.nan # 5h gap + data.loc["2009-01-01 15:00", "b__°C__zone_1"] = np.nan # 1h gap + data.loc["2009-01-01 17:00", "b__°C__zone_1"] = np.nan # 1h gap + data.loc["2009-01-01 20:00", "c__Wh__zone_2"] = np.nan # 1h gap + + return data + + +@pytest.fixture +def pipe_dict(): + """Create a standard pipeline dictionary for testing.""" + return { + "pre_processing": { + "°C": [["ReplaceThreshold", {"upper": 25}]], + "W/m2__outdoor": [["DropTimeGradient", {"upper_rate": -100}]], + }, + "common": [["Interpolate", ["linear"]], ["Ffill"], ["Bfill", {"limit": 3}]], + "resampling": [["Resample", ["3h", "mean", {"W/m2": "sum"}]]], + "compute_energy": [ + [ + "ExpressionCombine", + [ + { + "T1": "Tin__°C__building", + "T2": "Text__°C__outdoor", + "m": "mass_flwr__m3/h__hvac", + }, + "(T1 - T2) * m * 1004 * 1.204", + "Air_flow_energy__hvac__J", + True, + ], + ] + ], + } + + +class TestPipelineComponents: + """Tests for individual pipeline components and transformers.""" + + def test_pipe_from_proc_list(self, pipe_dict): + """Test creation and application of processing pipeline from list.""" + test_df = pd.DataFrame( + { + "temp__°C__building": [10.0, np.nan, 20.0, 30.0], + "humid__%HR__building": [50.0, 60.0, np.nan, 80.0], + }, + index=pd.date_range("2009", freq="h", periods=4, tz="UTC"), ) - res = col_trans.fit_transform(TEST_DF.copy()) + pipe = _get_pipe_from_proc_list(test_df.columns, pipe_dict["common"], tz="UTC") + result = pipe.fit_transform(test_df) - np.testing.assert_array_equal(res.iloc[:, 0].to_list(), [10.0, 20.0, np.nan]) - np.testing.assert_array_equal(res.iloc[:, 2].to_list(), [50.0, 100.0, np.nan]) + # Check that gaps were filled with interpolation + assert not result.isna().any().any() + # For temp: 10 -> [15] -> 20 -> 30 (linear interpolation) + assert result.iloc[1]["temp__°C__building"] == pytest.approx(15.0) + # For humid: 50 -> 60 -> [70] -> 80 (linear interpolation) + assert result.iloc[2]["humid__%HR__building"] == pytest.approx(70.0) - col_trans = _get_column_wise_transformer( - proc_dict=PIPE_DICT["pre_processing"], - data_columns=TEST_DF[ - [col for col in TEST_DF.columns if col != "radiation__W/m2__outdoor"] - ].columns, + # Check that non-gap values remain unchanged + assert result.iloc[0]["temp__°C__building"] == 10.0 + assert result.iloc[3]["temp__°C__building"] == 30.0 + assert result.iloc[0]["humid__%HR__building"] == 50.0 + assert result.iloc[1]["humid__%HR__building"] == 60.0 + + def test_column_wise_transformer(self, pipe_dict): + """Test column-wise transformer creation and application.""" + # Create controlled test data with known values + test_df = pd.DataFrame( + { + "temp1__°C__zone1": [24.0, 26.0, np.nan, 28.0], + # Two values above threshold + "temp2__°C__zone2": [23.0, 25.0, 27.0, np.nan], + # One value above threshold + "radiation__W/m2__outdoor": [100, 200, 50, 150], # For gradient test + "humid__%HR__zone1": [50.0, 60.0, 70.0, 80.0], # Should be unaffected + }, + index=pd.date_range("2009", freq="h", periods=4, tz="UTC"), + ) + + # Test with all columns + transformer = _get_column_wise_transformer( + proc_dict=pipe_dict["pre_processing"], + data_columns=test_df.columns, tz="UTC", process_name="test", ) + result = transformer.fit_transform(test_df.copy()) - res = col_trans.fit_transform( - TEST_DF[ - [col for col in TEST_DF.columns if col != "radiation__W/m2__outdoor"] - ].copy() - ) + # Check temperature threshold applied (excluding NaN) + temp1_mask = ~pd.isna(result["temp1__°C__zone1"]) + temp2_mask = ~pd.isna(result["temp2__°C__zone2"]) + assert (result["temp1__°C__zone1"][temp1_mask] <= 25).all() + assert (result["temp2__°C__zone2"][temp2_mask] <= 25).all() - np.testing.assert_array_equal(res.iloc[:, 0].to_list(), [10.0, 20.0, np.nan]) - assert len(col_trans.transformers_) == 2 + # Verify specific values + assert result.iloc[0]["temp1__°C__zone1"] == 24.0 # Unchanged + assert result.iloc[1]["temp2__°C__zone2"] == 25.0 # Capped + assert pd.isna(result.iloc[2]["temp1__°C__zone1"]) # NaN preserved + assert pd.isna(result.iloc[3]["temp1__°C__zone1"]) # Capped - cols_none = [ - "Humidity__%HR", - "Humidity__%HR__room1", - "Humidity_2", - "light__DIMENSIONLESS__building", - "mass_flwr__m3/h__hvac", - ] + # Check radiation gradient (should drop when rate < -100) + assert pd.isna( + result.iloc[2]["radiation__W/m2__outdoor"] + ) # Dropped due to steep negative gradient - col_trans = _get_column_wise_transformer( - proc_dict=PIPE_DICT["pre_processing"], - data_columns=cols_none, + # Check humidity unaffected + pd.testing.assert_series_equal( + result["humid__%HR__zone1"], test_df["humid__%HR__zone1"] + ) + + # Test with subset of columns (temperature only) + temp_cols = [col for col in test_df.columns if "°C" in col] + transformer = _get_column_wise_transformer( + proc_dict=pipe_dict["pre_processing"], + data_columns=temp_cols, tz="UTC", process_name="test", ) + assert len(transformer.transformers_) == 1 # Only temperature transformer - assert col_trans is None + # Test with no matching columns + humidity_cols = [col for col in test_df.columns if "%HR" in col] + transformer = _get_column_wise_transformer( + proc_dict=pipe_dict["pre_processing"], + data_columns=humidity_cols, + tz="UTC", + process_name="test", + ) + assert transformer is None # No transformers needed - def test_get_pipeline_from_dict(self): + def test_pipeline_from_dict(self, gapped_data): + """Test creation of full pipeline from dictionary configuration.""" pipe_dict = { "fill_1": {"a__°C__zone_1": [["Interpolate"]]}, - # "fill_2": {"b": [["Interpolate"]]}, "combine": [ [ "ExpressionCombine", @@ -148,43 +202,169 @@ def test_get_pipeline_from_dict(self): ], ] ], - "fill_3": [["Interpolate"]], + "fill_final": [["Interpolate"]], } - pipe = get_pipeline_from_dict(TEST_DF_2.columns, pipe_dict, verbose=True) - pipe.fit_transform(TEST_DF_2.copy()) + pipe = get_pipeline_from_dict(gapped_data.columns, pipe_dict, verbose=True) + result = pipe.fit_transform(gapped_data.copy()) - assert True + # Check new column created + assert "new_unit__°C²__zone_1" in result.columns - def test_plumber(self): - pipe = { - "fill_1": {"a__°C__zone_1": [["Interpolate"]]}, - "fill_2": {"b": [["Interpolate"]]}, - "combine": { - "zone_1": [ - [ - "ExpressionCombine", - [ - { - "T1": "a__°C__zone_1", - "T2": "b__°C__zone_1", - }, - "T1 * T2", - "new_unit__°C²__zone_1", - True, - ], - ] - ], + # Check gaps filled + assert not result.isna().any().any() + + +class TestPlumber: + """Tests for the Plumber class functionality.""" + + def test_initialization(self, gapped_data, pipe_dict): + """Test Plumber initialization and basic attributes.""" + plumber = Plumber(gapped_data, pipe_dict) + assert plumber.data is not None + assert plumber.root is not None + assert plumber.pipe_dict == pipe_dict + + def test_data_selection(self, gapped_data): + """Test data selection using tags.""" + plumber = Plumber(gapped_data) + + # Test unit selection + temp_cols = plumber.select("°C") + assert len(temp_cols) == 2 + assert all("°C" in col for col in temp_cols) + + # Test zone selection + zone_1_cols = plumber.select("zone_1") + assert len(zone_1_cols) == 2 + assert all("zone_1" in col for col in zone_1_cols) + + def test_pipeline_execution(self, basic_data, pipe_dict): + """Test pipeline execution with different step selections.""" + plumber = Plumber(basic_data, pipe_dict) + + # Test full pipeline + full_pipe = plumber.get_pipeline() + assert len(full_pipe.steps) > 0 + + # Test partial pipeline + partial_pipe = plumber.get_pipeline(steps=["pre_processing"]) + assert len(partial_pipe.steps) == 1 + + # Test with no pipeline + identity_pipe = plumber.get_pipeline(steps=None) + assert len(identity_pipe.steps) == 1 + assert identity_pipe.steps[0][0] == "Identity" + + def test_corrected_data(self, basic_data, pipe_dict): + """Test data correction through pipeline.""" + plumber = Plumber(basic_data, pipe_dict) + + # Test with time slice + result = plumber.get_corrected_data( + start="2009-01-01 05:00", stop="2009-01-01 10:00" + ) + assert len(result) == 3 + + +class TestGapsDescription: + """Tests for gap analysis functionality.""" + + @pytest.fixture + def gaps_data(self, time_index): + """Create data with specific gaps for testing gap analysis.""" + data = pd.DataFrame( + { + "temp__°C__Building": np.ones(24), + "humidity__%__Building": np.ones(24), + "power__W__Building": np.ones(24), }, - "fill_3": [["Interpolate"]], - } + index=time_index, + ) + + # Create gaps of different durations + data.loc["2009-01-01 02:00":"2009-01-01 04:00", "temp__°C__Building"] = np.nan + data.loc["2009-01-01 08:00", "temp__°C__Building"] = np.nan + data.loc["2009-01-01 12:00":"2009-01-01 14:00", "humidity__%__Building"] = ( + np.nan + ) + data.loc["2009-01-01 06:00":"2009-01-01 18:00", "power__W__Building"] = np.nan + + return data + + def test_basic_gaps_description(self, gaps_data): + """Test basic gap analysis functionality.""" + plumber = Plumber(gaps_data) + result = plumber.get_gaps_description() + + # Check structure + assert all(col in result.columns for col in gaps_data.columns) + expected_stats = [ + "data_presence_%", + "count", + "mean", + "std", + "min", + "25%", + "50%", + "75%", + "max", + ] + assert all(stat in result.index for stat in expected_stats) + + # Check specific values + temp_col = "temp__°C__Building" + assert result[temp_col]["count"] == 2 + assert result[temp_col]["data_presence_%"] == pytest.approx(83.33, rel=1e-2) - plumber = Plumber() - plumber.set_data(TEST_DF_2) - plumber.pipe_dict = pipe - plumber.get_pipeline() - plumber.get_pipeline(steps=["fill_3", "combine"]) + def test_gap_thresholds(self, gaps_data): + """Test gap analysis with duration thresholds.""" + plumber = Plumber(gaps_data) - plumber.plot() + # Test minimum duration threshold + result = plumber.get_gaps_description(gaps_gte="3h") + assert result["temp__°C__Building"]["count"] == 1 + assert result["power__W__Building"]["count"] == 1 - assert True + # Test maximum duration threshold + result = plumber.get_gaps_description(gaps_lte="2h") + assert result["temp__°C__Building"]["count"] == 1 + assert "power__W__Building" not in result.columns + + def test_gap_analysis_edge_cases(self, time_index): + """Test gap analysis edge cases.""" + # Test with no gaps + clean_data = pd.DataFrame({"temp__°C__Building": np.ones(24)}, index=time_index) + plumber = Plumber(clean_data) + result = plumber.get_gaps_description() + assert result.empty + + # Test with invalid selection + result = plumber.get_gaps_description(select="nonexistent") + assert result.empty + + # Test single point gap + data = clean_data.copy() + data.loc[data.index[12], "temp__°C__Building"] = np.nan + plumber = Plumber(data) + result = plumber.get_gaps_description() + assert result["temp__°C__Building"]["count"] == 1 + assert pd.Timedelta(result["temp__°C__Building"]["mean"]) == pd.Timedelta("1h") + + +class TestPlotting: + """Tests for plotting functionality.""" + + def test_basic_plot(self, gapped_data): + """Test basic plotting functionality.""" + plumber = Plumber(gapped_data) + fig = plumber.plot() + + # Check figure was created + assert fig is not None + # Check data is present in figure + assert len(fig.data) > 0 + # Check all columns are plotted + assert all( + col in [trace.name for trace in fig.data] for col in gapped_data.columns + ) diff --git a/tests/test_utils.py b/tests/test_utils.py index d7a6b82..9461b9c 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -13,7 +13,9 @@ parse_request_to_col_names, timedelta_to_int, NamedList, - get_series_bloc, + _get_series_bloc, + get_blocks_lte_and_gte, + get_blocks_mask_lte_and_gte, edit_tag_value_by_level, ) @@ -125,7 +127,7 @@ def test_get_series_bloc(self): toy_holes.loc["2009-01-01 05:00:00":"2009-01-01 08:00:00"] = np.nan toy_holes.loc["2009-01-01 12:00:00":"2009-01-01 16:00:00"] = np.nan - get_series_bloc( + _get_series_bloc( toy_holes, is_null=True, upper_td_threshold="3h", @@ -133,15 +135,15 @@ def test_get_series_bloc(self): ) # All data groups - assert len(get_series_bloc(toy_holes)) == 4 + assert len(_get_series_bloc(toy_holes)) == 4 # All gaps groups - assert len(get_series_bloc(toy_holes, is_null=True)) == 3 + assert len(_get_series_bloc(toy_holes, is_null=True)) == 3 # Gaps Inner bounds, one inclusive assert ( len( - get_series_bloc( + _get_series_bloc( toy_holes, is_null=True, select_inner=True, @@ -157,7 +159,7 @@ def test_get_series_bloc(self): # Gaps outer selection, one inclusive assert ( len( - get_series_bloc( + _get_series_bloc( toy_holes, is_null=True, select_inner=False, @@ -175,7 +177,7 @@ def test_get_series_bloc(self): [np.nan, 1, 2, np.nan, 3, 4, np.nan], index=pd.date_range("2009", freq="h", periods=7, tz="UTC"), ) - res = get_series_bloc(ser, is_null=True) + res = _get_series_bloc(ser, is_null=True) assert len(res) == 3 # No gaps case @@ -183,7 +185,7 @@ def test_get_series_bloc(self): [0.0, 1.0, 2.0, 2.5, 3, 4, 5.0], index=pd.date_range("2009", freq="h", periods=7, tz="UTC"), ) - res = get_series_bloc(ser, is_null=True) + res = _get_series_bloc(ser, is_null=True) assert res == [] @@ -192,7 +194,7 @@ def test_get_series_bloc(self): [0.0, 1.0, 2.0, np.nan, 3, 4, 5.0], index=pd.date_range("2009", freq="h", periods=7, tz="UTC"), ) - res = get_series_bloc(ser, is_null=True) + res = _get_series_bloc(ser, is_null=True) assert len(res) == 1 @@ -259,6 +261,98 @@ def test_get_data_blocks(self): ) assert res["data_1"] == [] + def test_get_blocks_lte_and_gte(self): + toy_df = pd.DataFrame( + {"data_1": np.random.randn(24), "data_2": np.random.randn(24)}, + index=pd.date_range("2009-01-01", freq="h", periods=24, tz="UTC"), + ) + + toy_df.loc["2009-01-01 01:00:00", "data_1"] = np.nan + toy_df.loc["2009-01-01 10:00:00":"2009-01-01 12:00:00", "data_1"] = np.nan + toy_df.loc["2009-01-01 15:00:00":"2009-01-01 23:00:00", "data_2"] = np.nan + + res = get_blocks_lte_and_gte(toy_df, "1h30min", "8h", True) + assert len(res["data_1"]) == 1 and len(res["data_2"]) == 1 + + res = get_blocks_lte_and_gte(toy_df, lte="8h", gte="1h30min", is_null=True) + assert len(res["data_1"]) == 1 and len(res["data_2"]) == 0 + + def test_get_blocks_mask_lte_and_gte(self): + toy_df = pd.DataFrame( + {"data_1": np.random.randn(24), "data_2": np.random.randn(24)}, + index=pd.date_range("2009-01-01", freq="h", periods=24, tz="UTC"), + ) + + toy_df.loc["2009-01-01 01:00:00", "data_1"] = np.nan + toy_df.loc["2009-01-01 10:00:00":"2009-01-01 12:00:00", "data_1"] = np.nan + toy_df.loc["2009-01-01 15:00:00":"2009-01-01 23:00:00", "data_2"] = np.nan + + res = get_blocks_mask_lte_and_gte(toy_df, "1h30min", "8h", True) + np.testing.assert_array_equal( + res.values, + np.array( + [ + [False, False], + [True, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, True], + [False, True], + [False, True], + [False, True], + [False, True], + [False, True], + [False, True], + [False, True], + [False, True], + ] + ), + ) + + res = get_blocks_mask_lte_and_gte(toy_df, lte="8h", gte="1h30min", is_null=True) + np.testing.assert_array_equal( + res.values, + np.array( + [ + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [True, False], + [True, False], + [True, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + [False, False], + ] + ), + ) + def test_outer_timestamps(self): ref_index = pd.date_range("2009-01-01", freq="d", periods=5, tz="UTC") idx = pd.date_range("2009-01-02", freq="d", periods=2, tz="UTC") diff --git a/tide/base.py b/tide/base.py index 3e66543..52693d7 100644 --- a/tide/base.py +++ b/tide/base.py @@ -4,7 +4,6 @@ import typing from abc import ABC, abstractmethod -import numpy as np import pandas as pd from sklearn.base import TransformerMixin, BaseEstimator @@ -16,10 +15,11 @@ timedelta_to_int, validate_odd_param, process_stl_odd_args, - get_data_blocks, + get_blocks_lte_and_gte, get_idx_freq_delta_or_min_time_interval, get_tags_max_level, NAME_LEVEL_MAP, + get_blocks_mask_lte_and_gte, ) from tide.meteo import get_oikolab_df @@ -205,39 +205,20 @@ def __init__( self.gaps_gte = gaps_gte def get_gaps_dict_to_fill(self, X: pd.Series | pd.DataFrame): - X = check_and_return_dt_index_df(X) - lower_th, upper_th = self.gaps_lte, self.gaps_gte - select_inner = False - - if lower_th is not None and upper_th is not None: - if pd.to_timedelta(lower_th) > pd.to_timedelta(upper_th): - lower_th, upper_th = upper_th, lower_th - select_inner = True - - return get_data_blocks( + return get_blocks_lte_and_gte( X, is_null=True, - select_inner=select_inner, - lower_td_threshold=lower_th, - upper_td_threshold=upper_th, - upper_threshold_inclusive=True, - lower_threshold_inclusive=True, - return_combination=False, + lte=self.gaps_lte, + gte=self.gaps_gte, ) def get_gaps_mask(self, X: pd.Series | pd.DataFrame): - gaps_dict = self.get_gaps_dict_to_fill(X) - mask_data = {} - - for col, idx_list in gaps_dict.items(): - if idx_list: - combined_idx = pd.concat([idx.to_series() for idx in idx_list]).index - mask_data[col] = X.index.isin(combined_idx) - else: - mask_data[col] = np.zeros(X.shape[0], dtype=bool) - - df_mask = pd.DataFrame(mask_data, index=X.index) - return df_mask + return get_blocks_mask_lte_and_gte( + X, + is_null=True, + lte=self.gaps_lte, + gte=self.gaps_gte, + ) class BaseOikoMeteo: diff --git a/tide/plumbing.py b/tide/plumbing.py index 115c5ec..8842ab4 100644 --- a/tide/plumbing.py +++ b/tide/plumbing.py @@ -14,6 +14,8 @@ get_data_level_values, get_tree_depth_from_level, NamedList, + get_blocks_lte_and_gte, + get_blocks_mask_lte_and_gte, ) from tide.plot import ( plot_gaps_heatmap, @@ -118,7 +120,53 @@ def get_pipeline_from_dict( class Plumber: + """A class for managing and transforming time series data through configurable processing pipelines. + + The Plumber class provides a high-level interface for: + - Managing time series data with hierarchical column naming (name__unit__bloc__sub_bloc) + - Creating and executing data processing pipelines + - Analyzing and visualizing data gaps + - Plotting time series with customizable layouts + + The class uses a tree structure to organize data columns based on their tags, + allowing for flexible data selection and manipulation. + + Attributes + ---------- + data : pd.DataFrame + The input time series data with datetime index + root : Node + Root node of the tree structure organizing column names + pipe_dict : dict + Configuration dictionary defining the processing pipeline steps + + Examples + -------- + >>> data = pd.DataFrame( + ... { + ... "temp__°C__zone1": [20, 21, np.nan, 23], + ... "humid__%HR__zone1": [50, 55, 60, np.nan], + ... }, + ... index=pd.date_range("2023", freq="h", periods=4), + ... ) + >>> pipe_dict = { + ... "pre_processing": {"°C": [["ReplaceThreshold", {"upper": 25}]]}, + ... "common": [["Interpolate", ["linear"]]], + ... } + >>> plumber = Plumber(data, pipe_dict) + >>> corrected = plumber.get_corrected_data() + """ + def __init__(self, data: pd.Series | pd.DataFrame = None, pipe_dict: dict = None): + """ + Parameters + ---------- + data : pd.Series or pd.DataFrame, optional + Input time series data. Must have a datetime index. + pipe_dict : dict, optional + Pipeline configuration dictionary. Each key represents a processing step + and contains the corresponding transformation parameters. + """ self.data = check_and_return_dt_index_df(data) if data is not None else None self.root = data_columns_to_tree(data.columns) if data is not None else None self.pipe_dict = pipe_dict @@ -144,13 +192,122 @@ def show( steps: None | str | list[str] | slice = slice(None), depth_level: int | str = None, ): + """Display the tree structure of selected data columns at selected steps for + a given depth level. + + Parameters + ---------- + select : str or pd.Index or list[str], optional + Data selection using tide's tag system + steps : None or str or list[str] or slice, default slice(None) + Pipeline steps to apply before showing the tree + depth_level : int or str, optional + Maximum depth level to display in the tree + """ pipe = self.get_pipeline(select=select, steps=steps) loc_tree = data_columns_to_tree(pipe.get_feature_names_out()) if depth_level is not None: depth_level = get_tree_depth_from_level(loc_tree.max_depth, depth_level) loc_tree.show(max_depth=depth_level) + def get_gaps_description( + self, + select: str | pd.Index | list[str] = None, + steps: None | str | list[str] | slice = slice(None), + verbose: bool = False, + gaps_lte: str | pd.Timedelta | dt.timedelta = None, + gaps_gte: str | pd.Timedelta | dt.timedelta = None, + return_combination: bool = True, + ) -> pd.DataFrame: + """ + Get statistical description of gaps durations in the data. + + Parameters + ---------- + select : str or pd.Index or list[str], optional + Data selection using tide's tag system + steps : None or str or list[str] or slice, default slice(None) + Pipeline steps to apply before analyzing gaps + verbose : bool, default False + Whether to print information about pipeline steps + gaps_lte : str or pd.Timedelta or dt.timedelta, optional + Upper threshold for gap duration + gaps_gte : str or pd.Timedelta or dt.timedelta, optional + Lower threshold for gap duration + return_combination : bool, default True + Whether to include statistics for gaps present in any column + + Returns + ------- + pd.DataFrame + DataFrame containing statistics about gap durations for each column. + Statistics include: + - data_presence_%: percentage of non-gap data points + - count: number of gaps + - mean: average gap duration + - std: standard deviation of gap durations + - min: shortest gap + - 25%: first quartile + - 50%: median + - 75%: third quartile + - max: longest gap + Empty DataFrame if no gaps are found. + """ + data = self.get_corrected_data(select, steps=steps, verbose=verbose) + + # Get gaps and calculate durations + gaps_dict = get_blocks_lte_and_gte( + data=data, + lte=gaps_lte, + gte=gaps_gte, + is_null=True, + return_combination=return_combination, + ) + + gap_durations = {} + for col, gaps_list in gaps_dict.items(): + if not gaps_list: + continue + + durations = [] + for gap in gaps_list: + if len(gap) > 1: + durations.append(gap[-1] - gap[0]) + else: + durations.append(pd.to_timedelta(gap.freq)) + + if durations: + gap_durations[col] = pd.Series(durations, name=col) + + if not gap_durations: + return pd.DataFrame() + + stats_df = pd.concat([ser.describe() for ser in gap_durations.values()], axis=1) + + gaps_mask = get_blocks_mask_lte_and_gte( + data=data, + lte=gaps_lte, + gte=gaps_gte, + is_null=True, + return_combination=return_combination, + ) + + presence_percentages = (1 - gaps_mask.mean()) * 100 + + stats_df.loc["data_presence_%"] = presence_percentages[stats_df.columns] + row_order = ["data_presence_%"] + [ + idx for idx in stats_df.index if idx != "data_presence_%" + ] + return stats_df.reindex(row_order) + def set_data(self, data: pd.Series | pd.DataFrame): + """Set new data for the Plumber instance. + + Parameters + ---------- + data : pd.Series or pd.DataFrame + New time series data to process. Must have a datetime index. + """ self.data = check_and_return_dt_index_df(data) self.root = data_columns_to_tree(data.columns) @@ -158,6 +315,20 @@ def select( self, select: str | pd.Index | list[str] = None, ): + """Select columns based on tags. + + Parameters + ---------- + select : str or pd.Index or list[str], optional + Selection criteria using tide's tag system. + Can be a unit (e.g., "°C"), location (e.g., "zone_1"), + or any other tag in the column names. + + Returns + ------- + pd.Index + Selected column names + """ return parse_request_to_col_names(self.data, select) def get_pipeline( @@ -166,6 +337,22 @@ def get_pipeline( steps: None | str | list[str] | slice = slice(None), verbose: bool = False, ) -> Pipeline: + """Create a scikit-learn pipeline from the configuration. + + Parameters + ---------- + select : str or pd.Index or list[str], optional + Data selection using tide's tag system + steps : None or str or list[str] or slice, default slice(None) + Pipeline steps to include. If None, returns an Identity transformer. + verbose : bool, default False + Whether to print information about pipeline steps + + Returns + ------- + Pipeline + Scikit-learn pipeline configured with the selected steps + """ if self.data is None: raise ValueError("data is required to build a pipeline") selection = parse_request_to_col_names(self.data, select) @@ -188,6 +375,26 @@ def get_corrected_data( steps: None | str | list[str] | slice = slice(None), verbose: bool = False, ) -> pd.DataFrame: + """Apply pipeline transformations to selected data. + + Parameters + ---------- + select : str or pd.Index or list[str], optional + Data selection using tide's tag system + start : str or datetime or Timestamp, optional + Start time for data slice + stop : str or datetime or Timestamp, optional + End time for data slice + steps : None or str or list[str] or slice, default slice(None) + Pipeline steps to apply + verbose : bool, default False + Whether to print information about pipeline steps + + Returns + ------- + pd.DataFrame + Transformed data + """ if self.data is None: raise ValueError("Cannot get corrected data. data are missing") select = parse_request_to_col_names(self.data, select) @@ -207,6 +414,30 @@ def plot_gaps_heatmap( title: str = None, verbose: bool = False, ): + """Create a heatmap visualization of data gaps. + + Parameters + ---------- + select : str or pd.Index or list[str], optional + Data selection using tide's tag system + start : str or datetime or Timestamp, optional + Start time for visualization + stop : str or datetime or Timestamp, optional + End time for visualization + steps : None or str or list[str] or slice, default slice(None) + Pipeline steps to apply before visualization + time_step : str or Timedelta or timedelta, optional + Time step for aggregating gaps + title : str, optional + Plot title + verbose : bool, default False + Whether to print information about pipeline steps + + Returns + ------- + go.Figure + Plotly figure object containing the heatmap + """ data = self.get_corrected_data(select, start, stop, steps, verbose) return plot_gaps_heatmap(data, time_step=time_step, title=title) @@ -236,6 +467,68 @@ def plot( y_title_standoff: int | float = 5, verbose: bool = False, ): + """Create an interactive time series plot. + + Creates a highly customizable plot that can show: + - Multiple time series with automatic different y-axes based on unit + - Two different versions of the data (e.g., raw and processed) + - Data gaps visualization + - Custom styling and layout + + Parameters + ---------- + select : str or pd.Index or list[str], optional + Data selection using tide's tag system + start : str or datetime or Timestamp, optional + Start time for plot + stop : str or datetime or Timestamp, optional + End time for plot + y_axis_level : str, optional + Tag level to use for y-axis grouping + y_tag_list : list[str], optional + List of tags for custom y-axis ordering + steps : None or str or list[str] or slice, default slice(None) + Pipeline steps to apply for main data + data_mode : str, default "lines" + Plot mode for main data ("lines", "markers", or "lines+markers") + steps_2 : None or str or list[str] or slice, optional + Pipeline steps to apply for secondary data + data_2_mode : str, default "markers" + Plot mode for secondary data + markers_opacity : float, default 0.8 + Opacity for markers + lines_width : float, default 2.0 + Width of plot lines + title : str, optional + Plot title + plot_gaps : bool, default False + Whether to highlight gaps in main data + gaps_lower_td : str or Timedelta or timedelta, optional + Minimum duration for gap highlighting + gaps_rgb : tuple[int, int, int], default (31, 73, 125) + RGB color for main data gaps + gaps_alpha : float, default 0.5 + Opacity for main data gaps + plot_gaps_2 : bool, default False + Whether to highlight gaps in secondary data + gaps_2_lower_td : str or Timedelta or timedelta, optional + Minimum duration for secondary data gap highlighting + gaps_2_rgb : tuple[int, int, int], default (254, 160, 34) + RGB color for secondary data gaps + gaps_2_alpha : float, default 0.5 + Opacity for secondary data gaps + axis_space : float, default 0.03 + Space between multiple y-axes + y_title_standoff : int or float, default 5 + Distance between y-axis title and axis + verbose : bool, default False + Whether to print information about pipeline steps + + Returns + ------- + go.Figure + Plotly figure object containing the plot + """ # A bit dirty. Here we assume that if you ask a selection # that is not found in original data columns, it is because it # has not yet been computed (using ExpressionCombine processor diff --git a/tide/utils.py b/tide/utils.py index b250d35..0ec969c 100644 --- a/tide/utils.py +++ b/tide/utils.py @@ -245,7 +245,7 @@ def _upper_bound(series, bound, bound_inclusive: bool, inner: bool): return op(series, bound) -def get_series_bloc( +def _get_series_bloc( date_series: pd.Series, is_null: bool = False, select_inner: bool = True, @@ -314,16 +314,114 @@ def get_series_bloc( ] +def get_blocks_lte_and_gte( + data: pd.Series | pd.DataFrame, + lte: str | dt.timedelta = None, + gte: str | dt.timedelta = None, + is_null: bool = False, + return_combination: bool = False, +): + """ + Get blocks of data ore gaps (nan) based on duration thresholds. + + Returns them in a dictionary as list of DateTimeIndex. The keys values are + data columns (or name if data is a Series). + + + Parameters: + ----------- + data : pd.Series or pd.DataFrame + The input data to be processed. + lte : str or datetime.timedelta, optional + The upper time threshold. Can be a string (e.g., '1h') or a timedelta object. + gte : str or datetime.timedelta, optional + The lower time threshold. Can be a string (e.g., '30min') or a timedelta object. + is_null : bool, default False + Whether to select blocks where the data is null. + + Notes: + ------ + - If both `lte` and `gte` are provided, and `lte` is smaller than `gte`, they + will be swapped. The function determines whether to select data within or outside + the boundaries based on the order of thresholds. + return_combination : bool, optional + If True (default), a combination column is created that checks for NaNs + across all columns in the DataFrame. Gaps in this combination column represent + rows where NaNs are present in any of the columns. + """ + + lower_th, upper_th = lte, gte + select_inner = False + if lower_th is not None and upper_th is not None: + if pd.to_timedelta(lower_th) > pd.to_timedelta(upper_th): + lower_th, upper_th = upper_th, lower_th + select_inner = True + + return get_data_blocks( + data=data, + is_null=is_null, + lower_td_threshold=lower_th, + upper_td_threshold=upper_th, + select_inner=select_inner, + return_combination=return_combination, + ) + + +def get_blocks_mask_lte_and_gte( + data: pd.Series | pd.DataFrame, + lte: str | dt.timedelta = None, + gte: str | dt.timedelta = None, + is_null: bool = False, + return_combination: bool = False, +) -> pd.DataFrame: + """ + Creates a boolean mask DataFrame indicating the location of data blocks or gaps. + + Parameters + ---------- + data : pd.Series or pd.DataFrame + The input time series data with a DateTime index + lte : str or timedelta, optional + The minimum duration threshold + gte : str or timedelta, optional + The maximum duration threshold + is_null : bool, default False + Whether to find NaN blocks (True) or valid data blocks (False) + return_combination : bool, optional + If True (default), a combination column is created that checks for NaNs + across all columns in the DataFrame. Gaps in this combination column represent + rows where NaNs are present in any of the columns. + + Returns + ------- + pd.DataFrame + Boolean mask DataFrame with same index as input data and columns + corresponding to the input data columns. True values indicate + the presence of a block matching the criteria. + """ + gaps_dict = get_blocks_lte_and_gte(data, lte, gte, is_null, return_combination) + + mask_data = {} + for col, idx_list in gaps_dict.items(): + if idx_list: + combined_idx = pd.concat([idx.to_series() for idx in idx_list]).index + mask_data[col] = data.index.isin(combined_idx) + else: + mask_data[col] = np.zeros(data.shape[0], dtype=bool) + + return pd.DataFrame(mask_data, index=data.index) + + def get_data_blocks( data: pd.Series | pd.DataFrame, is_null: bool = False, cols: str | list[str] = None, - select_inner: bool = True, lower_td_threshold: str | dt.timedelta = None, upper_td_threshold: str | dt.timedelta = None, + select_inner: bool = True, lower_threshold_inclusive: bool = True, upper_threshold_inclusive: bool = True, - return_combination=True, + return_combination: bool = True, ): """ Identifies groups of valid data if is_null = False, or groups of nan if @@ -350,10 +448,9 @@ def get_data_blocks( Whether to return groups with valid data, or groups of Nan values (is_null = True) cols : str or list[str], optional - The columns in the DataFrame for which to detect gaps. If None (default), all - columns are considered. - select_inner : Bool, default True - Select the groups of data inside or outside the given boundaries + Columns to analyze. If None, uses all columns. + select_inner : bool, default True + If True, select groups within thresholds. If False, select groups outside thresholds. lower_td_threshold : str or timedelta, optional The minimum duration of a period for it to be considered valid. Can be passed as a string (e.g., '1d' for one day) or a `timedelta`. @@ -381,17 +478,12 @@ def get_data_blocks( timestamps where the values in the corresponding column were NaN and exceeded the gap threshold. """ - data = check_and_return_dt_index_df(data) + cols = ensure_list(cols) or list(data.columns) - if isinstance(cols, str): - cols = [cols] - elif cols is None: - cols = list(data.columns) - - idx_dict = {} - for col in cols: - idx_dict[col] = get_series_bloc( + # Process each column + idx_dict = { + col: _get_series_bloc( data[col], is_null, select_inner, @@ -400,9 +492,11 @@ def get_data_blocks( lower_threshold_inclusive, upper_threshold_inclusive, ) + for col in cols + } if return_combination: - idx_dict["combination"] = get_series_bloc( + idx_dict["combination"] = _get_series_bloc( ~data.isnull().any(axis=1), is_null, select_inner,