Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions tests/test_plumbing.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,27 +160,28 @@ def test_plumber(self):
pipe = {
"fill_1": {"a__°C__zone_1": [["Interpolate"]]},
"fill_2": {"b": [["Interpolate"]]},
"combine": [
[
"ExpressionCombine",
"combine": {
"zone_1": [
[
{
"T1": "a__°C__zone_1",
"T2": "b__°C__zone_1",
},
"T1 * T2",
"new_unit__°C²__zone_1",
True,
],
]
],
"ExpressionCombine",
[
{
"T1": "a__°C__zone_1",
"T2": "b__°C__zone_1",
},
"T1 * T2",
"new_unit__°C²__zone_1",
True,
],
]
],
},
"fill_3": [["Interpolate"]],
}

plumber = Plumber()
plumber.set_data(TEST_DF_2)
plumber.pipe_dict = pipe

plumber.get_pipeline()
plumber.get_pipeline(steps=["fill_3", "combine"])

Expand Down
14 changes: 7 additions & 7 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
get_outer_timestamps,
data_columns_to_tree,
get_data_col_names_from_root,
get_data_level_names,
get_data_level_values,
parse_request_to_col_names,
timedelta_to_int,
NamedList,
get_series_bloc,
edit_tag_name_by_level,
edit_tag_value_by_level,
)

DF_COLUMNS = pd.DataFrame(
Expand All @@ -33,15 +33,15 @@
class TestUtils:
def test_edit_tag_name_by_level(self):
col_name = "temp__°C__bloc1"
new_name = edit_tag_name_by_level(col_name, 0, "temp_1")
new_name = edit_tag_value_by_level(col_name, 0, "temp_1")

assert new_name == "temp_1__°C__bloc1"

with pytest.raises(
ValueError,
match=r"Cannot edit tag name at level index 3. Columns have only 3 tag levels.",
):
edit_tag_name_by_level(col_name, 3, "temp_1")
edit_tag_value_by_level(col_name, 3, "temp_1")

def test_named_list(self):
test = NamedList(["a", "b", "c", "d"])
Expand Down Expand Up @@ -96,7 +96,7 @@ def test_parse_request_to_col_names(self):

def test_get_data_level_names(self):
root = data_columns_to_tree(DF_COLUMNS.columns)
res = get_data_level_names(root, "name")
res = get_data_level_values(root, "name")
assert res == [
"name_1",
"name_1",
Expand All @@ -107,10 +107,10 @@ def test_get_data_level_names(self):
"name4",
]

res = get_data_level_names(root, "unit")
res = get_data_level_values(root, "unit")
assert res == ["°C", "DIMENSIONLESS", "kWh/m²", "kWh"]

res = get_data_level_names(root, "bloc")
res = get_data_level_values(root, "bloc")
assert res == ["bloc1", "bloc2", "OTHER", "bloc4"]

def test_get_series_bloc(self):
Expand Down
14 changes: 8 additions & 6 deletions tide/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
get_data_blocks,
get_idx_freq_delta_or_min_time_interval,
ensure_list,
get_tag_levels,
get_tags_max_level,
NAME_LEVEL_MAP,
)

from tide.meteo import get_oikolab_df
Expand Down Expand Up @@ -74,18 +75,19 @@ def fit_check_features(self, X):
self.check_required_features(X)
self.feature_names_in_ = list(X.columns)

def get_set_tags_values_columns(self, X, tag_level: int, value: str):
nb_tags = get_tag_levels(X.columns)
if tag_level > nb_tags - 1:
def get_set_tags_values_columns(self, X, level: int | str, value: str):
nb_tags = get_tags_max_level(X.columns)
level = NAME_LEVEL_MAP(level) if isinstance(level, str) else level
if level > nb_tags:
raise ValueError(
f"Asking for level {tag_level} tag (indexing from 0). "
f"Asking for level {level} tag (indexing from 0). "
f"Only {nb_tags} tags found in columns"
)

new_columns = []
for col in X.columns:
parts = col.split("__")
parts[tag_level] = value
parts[level] = value
new_columns.append("__".join(parts))

return new_columns
Expand Down
4 changes: 2 additions & 2 deletions tide/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
check_and_return_dt_index_df,
parse_request_to_col_names,
data_columns_to_tree,
get_data_level_names,
get_data_level_values,
get_data_blocks,
get_outer_timestamps,
)
Expand Down Expand Up @@ -56,7 +56,7 @@ def get_cols_axis_maps_and_labels(
root = data_columns_to_tree(columns)
if root.max_depth >= 3:
level = y_axis_level if y_axis_level else "unit"
y_tags = get_data_level_names(root, level)
y_tags = get_data_level_values(root, level)
else:
return {col: {"yaxis": "y"} for col in columns}, {"y": columns}, columns

Expand Down
26 changes: 16 additions & 10 deletions tide/plumbing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
parse_request_to_col_names,
check_and_return_dt_index_df,
data_columns_to_tree,
get_data_level_names,
get_data_level_values,
get_tree_depth_from_level,
NamedList,
)
from tide.plot import (
Expand Down Expand Up @@ -92,7 +93,8 @@ def get_pipeline_from_dict(
verbose: bool = False,
):
if pipe_dict is None:
return Pipeline([("Identity", pc.Identity())], verbose=verbose)
pipe = Pipeline([("Identity", pc.Identity())], verbose=verbose)
return pipe.fit(_dummy_df(data_columns, "UTC"))
else:
steps_list = []
step_columns = data_columns.copy()
Expand Down Expand Up @@ -129,20 +131,24 @@ def __repr__(self):
rep_str += f"Number of tags : {tree_depth - 2} \n"
for tag in range(1, tree_depth - 1):
rep_str += f"=== {tag_levels[tag]} === \n"
for lvl_name in get_data_level_names(self.root, tag_levels[tag]):
for lvl_name in get_data_level_values(self.root, tag_levels[tag]):
rep_str += f"{lvl_name}\n"
rep_str += "\n"
return rep_str
else:
return super().__repr__()

def show(self, steps: None | str | list[str] | slice = slice(None)):
if steps is None:
if self.root is not None:
self.root.show()
elif self.data is not None:
pipe = self.get_pipeline(steps=steps)
data_columns_to_tree(pipe.get_feature_names_out()).show()
def show(
self,
select: str | pd.Index | list[str] = None,
steps: None | str | list[str] | slice = slice(None),
depth_level: int | str = None,
):
pipe = self.get_pipeline(select=select, steps=steps)
loc_tree = data_columns_to_tree(pipe.get_feature_names_out())
if depth_level is not None:
depth_level = get_tree_depth_from_level(loc_tree.max_depth, depth_level)
loc_tree.show(max_depth=depth_level)

def set_data(self, data: pd.Series | pd.DataFrame):
self.data = check_and_return_dt_index_df(data)
Expand Down
137 changes: 79 additions & 58 deletions tide/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,81 @@
# To 3 levels of tags unit__bloc_sub_bloc

LEVEL_FORMAT = {
1: lambda pt: f"DATA__{pt[0]}",
2: lambda pt: f"DATA__{pt[1]}__{pt[0]}",
3: lambda pt: f"DATA__{pt[2]}__{pt[1]}__{pt[0]}",
4: lambda pt: f"DATA__{pt[2]}__{pt[3]}__{pt[1]}__{pt[0]}",
0: lambda pt: f"DATA__{pt[0]}",
1: lambda pt: f"DATA__{pt[1]}__{pt[0]}",
2: lambda pt: f"DATA__{pt[2]}__{pt[1]}__{pt[0]}",
3: lambda pt: f"DATA__{pt[2]}__{pt[3]}__{pt[1]}__{pt[0]}",
}

LEVEL_NAME_MAP = {0: "name", 1: "unit", 2: "bloc", 3: "sub_bloc"}
NAME_LEVEL_MAP = {name: level for level, name in LEVEL_NAME_MAP.items()}

TREE_LEVEL_NAME_MAP = {
5: {"name": 4, "unit": 3, "bloc": 1, "sub_bloc": 2},
4: {"name": 3, "unit": 2, "bloc": 1},
3: {"name": 2, "unit": 1},
2: {"name": 1},
}


def get_tree_depth_from_level(tree_max_depth: int, level: int | str):
level = LEVEL_NAME_MAP[level] if isinstance(level, int) else level
if tree_max_depth not in TREE_LEVEL_NAME_MAP:
raise ValueError(
f"Unsupported root depth of {tree_max_depth}. Allowed depths are 2 to 5."
)

level_indices = TREE_LEVEL_NAME_MAP[tree_max_depth]

if level not in level_indices:
raise ValueError(
f"Unknown level {level}. Allowed levels are{level_indices.keys()}"
)

return level_indices[level]


def get_data_level_values(data_root, level: int | str):
"""
Return a list of string containing values of tag at specified level.
Warning bloc, unit and sub_bloc level ar unique
:param data_root: big tree root
:param level: int or string corresponding to tag level
:return: list of values
"""
tree_level = get_tree_depth_from_level(data_root.max_depth, level)

nodes = [
[node.name for node in node_group]
for node_group in levelordergroup_iter(data_root)
]

selected_nodes = nodes[tree_level]

if level in ["bloc", "unit", "sub_bloc"]:
# Return list with no duplicates
return list(dict.fromkeys(selected_nodes))
else:
return selected_nodes


def get_tags_max_level(data_columns: pd.Index | list[str]) -> int:
"""
Returns max used tag level from data columns names
:param data_columns: DataFrame columns holding time series names with tags
"""
return max(len(col.split("__")) - 1 for col in data_columns)


def edit_tag_value_by_level(col_name: str, level: int | str, new_tag_name: str) -> str:
parts = col_name.split("__")
if level > len(parts) - 1:
raise ValueError(
f"Cannot edit tag name at level index {level}. Columns have only {len(parts)} tag levels."
)
parts[level] = new_tag_name
return "__".join(parts)


class NamedList:
def __init__(self, elements: list):
Expand Down Expand Up @@ -49,24 +118,6 @@ def get_added_removed_col(original_idx: list | pd.Index, new_idx: list | pd.Inde
return added_columns, removed_columns


def get_tag_levels(data_columns: pd.Index | list[str]) -> int:
"""
Returns max number of used tags from data columns names
:param data_columns: DataFrame columns holding time series names with tags
"""
return max(len(col.split("__")) for col in data_columns)


def edit_tag_name_by_level(col_name: str, tag_level: int, new_tag_name: str) -> str:
parts = col_name.split("__")
if tag_level > len(parts) - 1:
raise ValueError(
f"Cannot edit tag name at level index {tag_level}. Columns have only {len(parts)} tag levels."
)
parts[tag_level] = new_tag_name
return "__".join(parts)


def col_name_tag_enrichment(col_name: str, tag_levels: int) -> str:
"""
Enriches a column name by adding default tags until it reaches the specified
Expand Down Expand Up @@ -98,38 +149,6 @@ def get_data_col_names_from_root(data_root):
][-1]


def get_data_level_names(data_root, level: str):
depth_levels = {
5: {"name": 4, "unit": 3, "bloc": 1, "sub_bloc": 2},
4: {"name": 3, "unit": 2, "bloc": 1},
3: {"name": 2, "unit": 1},
2: {"name": 1},
}

max_depth = data_root.max_depth
if max_depth not in depth_levels:
raise ValueError(
f"Unsupported root depth of {max_depth}. Allowed depths are 2 to 5."
)

level_indices = depth_levels[max_depth]

if level not in level_indices:
raise ValueError(f"Unknown level {level}")

nodes = [
[node.name for node in node_group]
for node_group in levelordergroup_iter(data_root)
]

selected_nodes = nodes[level_indices[level]]

if level in {"bloc", "unit", "sub_bloc"}:
return list(dict.fromkeys(selected_nodes))
else:
return selected_nodes


def parse_request_to_col_names(
data_columns: pd.Index | list[str], request: str | pd.Index | list[str] = None
) -> list[str]:
Expand All @@ -150,7 +169,7 @@ def parse_request_to_col_names(
)

full_tag_col_map = {
col_name_tag_enrichment(col, get_tag_levels(data_columns)): col
col_name_tag_enrichment(col, get_tags_max_level(data_columns)): col
for col in data_columns
}

Expand Down Expand Up @@ -181,10 +200,12 @@ def data_columns_to_tree(columns: pd.Index | list[str]) -> T:
data time series. Names should follow the "name__unit__bloc_sub_bloc"
naming convention
"""
tag_levels = get_tag_levels(columns)
tag_levels = get_tags_max_level(columns)

if not 1 <= tag_levels <= 4:
raise ValueError(f"Only up to 4 tags are allowed; found {tag_levels}.")
if not 0 <= tag_levels <= 3:
raise ValueError(
f"Only up to 4 tags are allowed; found tag level {tag_levels}."
)

parsed_dict = {}
for col in columns:
Expand Down