Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 55 additions & 27 deletions src/special_preprocessing/date_transformers/series_comp.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,103 @@
from typing import Optional
from typing import Any, Optional

import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.exceptions import NotFittedError


class GroupByDateTransformer(BaseEstimator, TransformerMixin):
def __init__(self) -> None:
self._is_fitted_: bool = False

def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> "GroupByDateTransformer":
self._is_fitted_: bool = True
self._is_fitted_ = True
return self

def transform(self, X: pd.DataFrame) -> pd.DataFrame:
special_columns = ["discount", "price", "discount.1", "key", "date", "ship", "discount"]
special_columns = ["price", "discount.1", "key", "date", "ship"]
other = [col for col in X.columns if col not in special_columns]
new_data = pd.DataFrame()
keys = X["key"].unique()
for key in keys:
df_key = X[X["key"] == key]
grouped = df_key.groupby(["key", "date"], as_index=False).agg(
grouped = df_key.groupby(["key", "date", "mark"], as_index=False).agg(
{
**{col: "max" for col in other},
"ship": "sum",
"discount.1": "mean",
"price": "mean",
"discount": "max",
}
)
new_data = pd.concat([new_data, grouped], ignore_index=True)
return new_data


class DateRangeFilledTransformer(BaseEstimator, TransformerMixin):
def __init__(self) -> None:
def __init__(self, fill_config: dict[str, Any]) -> None:
self._is_fitted_: bool = False
self.test_separator_: Optional[pd.DatetimeIndex] = None
self.statistics_: Optional[dict] = None
self.fixed_columns_: Optional[list[str]] = None
self.first_values_: Optional[pd.DataFrame] = None
self.fill_config = fill_config

def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> "DateRangeFilledTransformer":
train_data = X[X["mark"] == "train"].copy()
self.test_separator_ = train_data["date"].max()
self.statistics_ = {}
cols_to_agg = {col: method for col, method in self.fill_config.items() if isinstance(method, str)}
if cols_to_agg:
aggregated_stats = train_data.groupby("key").agg(cols_to_agg)
for col in aggregated_stats.columns:
self.statistics_[col] = aggregated_stats[col]
special_cols = ["key", "date", "mark"] + list(self.fill_config.keys())
self.fixed_columns_ = [col for col in X.columns if col not in special_cols]

if self.fixed_columns_:
self.first_values_ = train_data.groupby("key")[self.fixed_columns_].first()

self._is_fitted_ = True
return self

def transform(self, X: pd.DataFrame) -> pd.DataFrame:
test_separator = X[X["mark"] == "train"]["date"].max()
special_columns = ["discount", "price", "discount.1", "key", "date", "ship", "mark"]
other = [col for col in X.columns if col not in special_columns]

if self.test_separator_ is None or self.statistics_ is None or self.first_values_ is None:
raise NotFittedError()
date_range = pd.date_range(X["date"].min(), X["date"].max(), freq="D")
missing_data = []
unique_keys = X["key"].unique()

missing_data_list = []

for key in unique_keys:
product_data = X[X["key"] == key]
existing_dates = list(set(product_data["date"]))
missing_dates = date_range.difference(existing_dates)

if missing_dates.empty:
continue
new_dt = pd.DataFrame(
{
"date": missing_dates,
"ship": 0,
"discount": 0,
"discount.1": 0,
"price": product_data["price"].mean(),
"key": key,
"mark": np.where(missing_dates <= test_separator, "train", "test"),
}
)

for col in other:
new_dt[col] = product_data[col].iloc[0]
missing_data.append(new_dt)
new_rows_data = {
"date": missing_dates,
"key": key,
"mark": np.where(missing_dates <= self.test_separator_, "train", "test"),
}

for col, method in self.fill_config.items():
if isinstance(method, (int, float)):
new_rows_data[col] = method
elif isinstance(method, str):
value = self.statistics_[col].get(key, 0)
new_rows_data[col] = value
if self.fixed_columns_:
for col in self.fixed_columns_:
if key in self.first_values_.index:
value = self.first_values_.loc[key, col]
else:
value = product_data[col].iloc[0]
new_rows_data[col] = value

missing_data_list.append(pd.DataFrame(new_rows_data))

if missing_data:
X = pd.concat([X] + missing_data, ignore_index=True)
if missing_data_list:
X = pd.concat([X] + missing_data_list, ignore_index=True).reset_index(drop=True)
return X
4 changes: 3 additions & 1 deletion src/special_preprocessing/first_special_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ def preprocessing() -> Pipeline:
return pipline


fill_strategy = {"price": "mean", "ship": 0, "discount": 0, "discount.1": 0}

preprocessing_pipeline = Pipeline(
steps=[
("base preprocessing", preprocessing()),
("fill_data_range", DateRangeFilledTransformer()),
("fill_data_range", DateRangeFilledTransformer(fill_config=fill_strategy)),
("grouping", GroupByDateTransformer()),
("features extraction", FeatureExtractionTransformer()),
("decomposition", SeriesDecompositionTransformer()),
Expand Down
6 changes: 4 additions & 2 deletions src/special_preprocessing/second_special_pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
from sklearn.pipeline import Pipeline

from src.special_preprocessing.date_transformers.features_extraction import FeatureExtractionTransformer
from src.special_preprocessing.date_transformers.series_comp import DateRangeFilledTransformer
from src.special_preprocessing.date_transformers.series_decomposition import Separation, SeriesDecompositionTransformer
from src.special_preprocessing.second_special_pipeline.preprocessing import (
CategoricalFeaturesTransform,
DateRangeFilledTransformerSec,
DiscountTransformer,
KeyTransformer,
RenameColumns,
)

fill_strategy = {"ship": 0, "discount": 0, "discount.1": 0}

preprocessing_pipeline = Pipeline(
steps=[
("rename", RenameColumns()),
("key", KeyTransformer()),
("discount", DiscountTransformer()),
("fill_data_range", DateRangeFilledTransformerSec()),
("fill_data_range", DateRangeFilledTransformer(fill_config=fill_strategy)),
("categorical_features_prep", CategoricalFeaturesTransform()),
("features extraction", FeatureExtractionTransformer()),
("decomposition", SeriesDecompositionTransformer()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> "DiscountTra
return self

def transform(self, X: pd.DataFrame) -> pd.DataFrame:
X["discount"] = X["discount.1"].apply(lambda x: True if x > 0 else False)
X["discount"] = X["discount.1"].apply(lambda x: 1 if x > 0 else 0)
return X


Expand Down Expand Up @@ -71,45 +71,3 @@ def transform(self, X: pd.DataFrame) -> pd.DataFrame:
if self._transformer is None:
raise NotFittedError()
return self._transformer.transform(X)


class DateRangeFilledTransformerSec(BaseEstimator, TransformerMixin):
def __init__(self) -> None:
self._is_fitted_: bool = False

def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> "DateRangeFilledTransformerSec":
self._is_fitted_ = True
return self

def transform(self, X: pd.DataFrame) -> pd.DataFrame:
test_separator = X[X["mark"] == "train"]["date"].max()
special_columns = ["discount", "price", "discount.1", "key", "date", "ship", "mark"]
other = [col for col in X.columns if col not in special_columns]

date_range = pd.date_range(X["date"].min(), X["date"].max(), freq="D")
missing_data = []
unique_keys = X["key"].unique()
for key in unique_keys:
product_data = X[X["key"] == key]
existing_dates = list(set(product_data["date"]))
missing_dates = date_range.difference(existing_dates)
if missing_dates.empty:
continue
mark_col = np.where(missing_dates <= test_separator, "train", "test")
new_dt = pd.DataFrame(
{
"date": missing_dates,
"ship": [0] * len(missing_dates),
"discount": [0] * len(missing_dates),
"discount.1": [0] * len(missing_dates),
"key": [key] * len(missing_dates),
"mark": mark_col,
}
)
for col in other:
new_dt[col] = product_data[col].iloc[0]
missing_data.append(new_dt)

if missing_data:
X = pd.concat([X] + missing_data, ignore_index=True)
return X