From 365dc6029af29bd331308b8a7a29a03a9f0d1c66 Mon Sep 17 00:00:00 2001 From: Engelsgeduld Date: Mon, 16 Jun 2025 12:41:38 +0300 Subject: [PATCH] Fix: data leak in DateRangeFilled and GroupByDate, unify DateRangeFilled for datasets --- .../date_transformers/series_comp.py | 81 +++++++++++++------ .../first_special_pipeline/pipeline.py | 4 +- .../second_special_pipeline/pipeline.py | 6 +- .../second_special_pipeline/preprocessing.py | 44 +--------- 4 files changed, 63 insertions(+), 72 deletions(-) diff --git a/src/special_preprocessing/date_transformers/series_comp.py b/src/special_preprocessing/date_transformers/series_comp.py index f30aa21..d4a5453 100644 --- a/src/special_preprocessing/date_transformers/series_comp.py +++ b/src/special_preprocessing/date_transformers/series_comp.py @@ -1,23 +1,27 @@ -from typing import Optional +from typing import Any, Optional import numpy as np import pandas as pd from sklearn.base import BaseEstimator, TransformerMixin +from sklearn.exceptions import NotFittedError class GroupByDateTransformer(BaseEstimator, TransformerMixin): + def __init__(self) -> None: + self._is_fitted_: bool = False + def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> "GroupByDateTransformer": - self._is_fitted_: bool = True + self._is_fitted_ = True return self def transform(self, X: pd.DataFrame) -> pd.DataFrame: - special_columns = ["discount", "price", "discount.1", "key", "date", "ship"] + special_columns = ["price", "discount.1", "key", "date", "ship"] other = [col for col in X.columns if col not in special_columns] new_data = pd.DataFrame() keys = X["key"].unique() for key in keys: df_key = X[X["key"] == key] - grouped = df_key.groupby(["key", "date"], as_index=False).agg( + grouped = df_key.groupby(["key", "date", "mark"], as_index=False).agg( { **{col: "max" for col in other}, "ship": "sum", @@ -30,22 +34,40 @@ def transform(self, X: pd.DataFrame) -> pd.DataFrame: class DateRangeFilledTransformer(BaseEstimator, TransformerMixin): - def __init__(self) -> None: + def __init__(self, fill_config: dict[str, Any]) -> None: self._is_fitted_: bool = False + self.test_separator_: Optional[pd.DatetimeIndex] = None + self.statistics_: Optional[dict] = None + self.fixed_columns_: Optional[list[str]] = None + self.first_values_: Optional[pd.DataFrame] = None + self.fill_config = fill_config def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> "DateRangeFilledTransformer": + train_data = X[X["mark"] == "train"].copy() + self.test_separator_ = train_data["date"].max() + self.statistics_ = {} + cols_to_agg = {col: method for col, method in self.fill_config.items() if isinstance(method, str)} + if cols_to_agg: + aggregated_stats = train_data.groupby("key").agg(cols_to_agg) + for col in aggregated_stats.columns: + self.statistics_[col] = aggregated_stats[col] + special_cols = ["key", "date", "mark"] + list(self.fill_config.keys()) + self.fixed_columns_ = [col for col in X.columns if col not in special_cols] + + if self.fixed_columns_: + self.first_values_ = train_data.groupby("key")[self.fixed_columns_].first() + self._is_fitted_ = True return self def transform(self, X: pd.DataFrame) -> pd.DataFrame: - test_separator = X[X["mark"] == "train"]["date"].max() - special_columns = ["discount", "price", "discount.1", "key", "date", "ship", "mark"] - other = [col for col in X.columns if col not in special_columns] - + if self.test_separator_ is None or self.statistics_ is None or self.first_values_ is None: + raise NotFittedError() date_range = pd.date_range(X["date"].min(), X["date"].max(), freq="D") - missing_data = [] unique_keys = X["key"].unique() + missing_data_list = [] + for key in unique_keys: product_data = X[X["key"] == key] existing_dates = list(set(product_data["date"])) @@ -53,22 +75,29 @@ def transform(self, X: pd.DataFrame) -> pd.DataFrame: if missing_dates.empty: continue - new_dt = pd.DataFrame( - { - "date": missing_dates, - "ship": 0, - "discount": 0, - "discount.1": 0, - "price": product_data["price"].mean(), - "key": key, - "mark": np.where(missing_dates <= test_separator, "train", "test"), - } - ) - for col in other: - new_dt[col] = product_data[col].iloc[0] - missing_data.append(new_dt) + new_rows_data = { + "date": missing_dates, + "key": key, + "mark": np.where(missing_dates <= self.test_separator_, "train", "test"), + } + + for col, method in self.fill_config.items(): + if isinstance(method, (int, float)): + new_rows_data[col] = method + elif isinstance(method, str): + value = self.statistics_[col].get(key, 0) + new_rows_data[col] = value + if self.fixed_columns_: + for col in self.fixed_columns_: + if key in self.first_values_.index: + value = self.first_values_.loc[key, col] + else: + value = product_data[col].iloc[0] + new_rows_data[col] = value + + missing_data_list.append(pd.DataFrame(new_rows_data)) - if missing_data: - X = pd.concat([X] + missing_data, ignore_index=True) + if missing_data_list: + X = pd.concat([X] + missing_data_list, ignore_index=True).reset_index(drop=True) return X diff --git a/src/special_preprocessing/first_special_pipeline/pipeline.py b/src/special_preprocessing/first_special_pipeline/pipeline.py index 309ddf4..d6ffeb7 100644 --- a/src/special_preprocessing/first_special_pipeline/pipeline.py +++ b/src/special_preprocessing/first_special_pipeline/pipeline.py @@ -40,10 +40,12 @@ def preprocessing() -> Pipeline: return pipline +fill_strategy = {"price": "mean", "ship": 0, "discount": 0, "discount.1": 0} + preprocessing_pipeline = Pipeline( steps=[ ("base preprocessing", preprocessing()), - ("fill_data_range", DateRangeFilledTransformer()), + ("fill_data_range", DateRangeFilledTransformer(fill_config=fill_strategy)), ("grouping", GroupByDateTransformer()), ("features extraction", FeatureExtractionTransformer()), ("decomposition", SeriesDecompositionTransformer()), diff --git a/src/special_preprocessing/second_special_pipeline/pipeline.py b/src/special_preprocessing/second_special_pipeline/pipeline.py index c51d8dc..b3d1690 100644 --- a/src/special_preprocessing/second_special_pipeline/pipeline.py +++ b/src/special_preprocessing/second_special_pipeline/pipeline.py @@ -1,21 +1,23 @@ from sklearn.pipeline import Pipeline from src.special_preprocessing.date_transformers.features_extraction import FeatureExtractionTransformer +from src.special_preprocessing.date_transformers.series_comp import DateRangeFilledTransformer from src.special_preprocessing.date_transformers.series_decomposition import Separation, SeriesDecompositionTransformer from src.special_preprocessing.second_special_pipeline.preprocessing import ( CategoricalFeaturesTransform, - DateRangeFilledTransformerSec, DiscountTransformer, KeyTransformer, RenameColumns, ) +fill_strategy = {"ship": 0, "discount": 0, "discount.1": 0} + preprocessing_pipeline = Pipeline( steps=[ ("rename", RenameColumns()), ("key", KeyTransformer()), ("discount", DiscountTransformer()), - ("fill_data_range", DateRangeFilledTransformerSec()), + ("fill_data_range", DateRangeFilledTransformer(fill_config=fill_strategy)), ("categorical_features_prep", CategoricalFeaturesTransform()), ("features extraction", FeatureExtractionTransformer()), ("decomposition", SeriesDecompositionTransformer()), diff --git a/src/special_preprocessing/second_special_pipeline/preprocessing.py b/src/special_preprocessing/second_special_pipeline/preprocessing.py index 16dc33f..3747990 100644 --- a/src/special_preprocessing/second_special_pipeline/preprocessing.py +++ b/src/special_preprocessing/second_special_pipeline/preprocessing.py @@ -39,7 +39,7 @@ def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> "DiscountTra return self def transform(self, X: pd.DataFrame) -> pd.DataFrame: - X["discount"] = X["discount.1"].apply(lambda x: True if x > 0 else False) + X["discount"] = X["discount.1"].apply(lambda x: 1 if x > 0 else 0) return X @@ -71,45 +71,3 @@ def transform(self, X: pd.DataFrame) -> pd.DataFrame: if self._transformer is None: raise NotFittedError() return self._transformer.transform(X) - - -class DateRangeFilledTransformerSec(BaseEstimator, TransformerMixin): - def __init__(self) -> None: - self._is_fitted_: bool = False - - def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> "DateRangeFilledTransformerSec": - self._is_fitted_ = True - return self - - def transform(self, X: pd.DataFrame) -> pd.DataFrame: - test_separator = X[X["mark"] == "train"]["date"].max() - special_columns = ["discount", "price", "discount.1", "key", "date", "ship", "mark"] - other = [col for col in X.columns if col not in special_columns] - - date_range = pd.date_range(X["date"].min(), X["date"].max(), freq="D") - missing_data = [] - unique_keys = X["key"].unique() - for key in unique_keys: - product_data = X[X["key"] == key] - existing_dates = list(set(product_data["date"])) - missing_dates = date_range.difference(existing_dates) - if missing_dates.empty: - continue - mark_col = np.where(missing_dates <= test_separator, "train", "test") - new_dt = pd.DataFrame( - { - "date": missing_dates, - "ship": [0] * len(missing_dates), - "discount": [0] * len(missing_dates), - "discount.1": [0] * len(missing_dates), - "key": [key] * len(missing_dates), - "mark": mark_col, - } - ) - for col in other: - new_dt[col] = product_data[col].iloc[0] - missing_data.append(new_dt) - - if missing_data: - X = pd.concat([X] + missing_data, ignore_index=True) - return X