diff --git a/pyproject.toml b/pyproject.toml index 26989ae..53d8442 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "views_stepshifter" -version = "0.4.0" +version = "1.0.0" description = "" authors = [ "Xiaolong Sun ", @@ -11,13 +11,14 @@ readme = "README.md" [tool.poetry.dependencies] python = ">=3.11,<3.15" -views_pipeline_core = ">=1.0.0,<2.0.0" +views_pipeline_core = ">=2.0.0,<3.0.0" scikit-learn = "^1.6.0" pandas = "^1.5.3" numpy = "^1.25.2" darts = "^0.30.0" lightgbm = "4.6.0" views_forecasts = "^0.5.5" +scipy = "1.15.1" # error with latest scipy 1.16.0. see https://github.com/statsmodels/statsmodels/issues?q=_lazywhere diff --git a/tests/test_hurdle_model.py b/tests/test_hurdle_model.py index 385fb2a..734276b 100644 --- a/tests/test_hurdle_model.py +++ b/tests/test_hurdle_model.py @@ -125,7 +125,7 @@ def test_predict(sample_config, sample_partitioner_dict, sample_dataframe): patch("views_stepshifter.models.hurdle_model.as_completed") as mock_as_completed, \ patch("views_stepshifter.models.hurdle_model.tqdm.tqdm") as mock_tqdm, \ patch("views_stepshifter.models.hurdle_model.ProcessPoolExecutor") as mock_ProcessPoolExecutor, \ - patch("views_stepshifter.models.hurdle_model.ModelManager._resolve_evaluation_sequence_number") as mock_sequence_number: + patch("views_stepshifter.models.hurdle_model.ForecastingModelManager._resolve_evaluation_sequence_number") as mock_sequence_number: # the else branch diff --git a/tests/test_stepshifter.py b/tests/test_stepshifter.py index 3a43b54..370d76a 100644 --- a/tests/test_stepshifter.py +++ b/tests/test_stepshifter.py @@ -3,7 +3,7 @@ import numpy as np from unittest.mock import patch, MagicMock, call from views_stepshifter.models.stepshifter import StepshifterModel -from views_pipeline_core.managers.model import ModelManager +from views_pipeline_core.managers.model import ModelManager, ForecastingModelManager @pytest.fixture def config(): @@ -182,7 +182,7 @@ def test_predict(config, partitioner_dict, sample_dataframe): patch("views_stepshifter.models.stepshifter.as_completed") as mock_as_completed, \ patch("views_stepshifter.models.stepshifter.tqdm.tqdm") as mock_tqdm, \ patch("views_stepshifter.models.stepshifter.ProcessPoolExecutor") as mock_ProcessPoolExecutor, \ - patch("views_stepshifter.models.stepshifter.ModelManager._resolve_evaluation_sequence_number") as mock_sequence_number: + patch("views_stepshifter.models.stepshifter.ForecastingModelManager._resolve_evaluation_sequence_number") as mock_sequence_number: # the else branch diff --git a/tests/test_stepshifter_manager.py b/tests/test_stepshifter_manager.py index 4cd430c..fb56a2d 100644 --- a/tests/test_stepshifter_manager.py +++ b/tests/test_stepshifter_manager.py @@ -118,18 +118,30 @@ def test_get_standardized_df(): """ Test the _get_standardized_df method to ensure it correctly standardizes the DataFrame. """ - df = pd.DataFrame({ + df1 = pd.DataFrame({ "a": [1.0, -1.0, np.inf, -np.inf, 3.0], "b": [4.0, 5.0, -6.0, 7.0, -8.0] }) - expected_df = pd.DataFrame({ + expected_df1 = pd.DataFrame({ "a": [1.0, 0.0, 0.0, 0.0, 3.0], "b": [4.0, 5.0, 0.0, 7.0, 0.0] }) - result_df = StepshifterManager._get_standardized_df(df) - pd.testing.assert_frame_equal(result_df, expected_df) - - + df2 = pd.DataFrame({ + "a": [[1.0, -1.0, np.inf], + [-np.inf, 3.0, 4.0]], + "b": [[4.0, 5.0, -6.0], + [7.0, -8.0, 9.0]], + }) + expected_df2 = pd.DataFrame({ + "a": [[1.0, 0.0, 0.0], + [0.0, 3.0, 4.0]], + "b": [[4.0, 5.0, 0.0], + [7.0, 0.0, 9.0]], + }) + result_df1 = StepshifterManager._get_standardized_df(df1) + result_df2 = StepshifterManager._get_standardized_df(df2) + pd.testing.assert_frame_equal(result_df1, expected_df1) + pd.testing.assert_frame_equal(result_df2, expected_df2) def test_split_hurdle_parameters(stepshifter_manager_hurdle): """ @@ -164,9 +176,6 @@ def test_get_model(stepshifter_manager, stepshifter_manager_hurdle, mock_partiti stepshifter_manager._get_model(mock_partitioner_dict) mock_stepshifter_model.assert_called_once_with(stepshifter_manager.config, mock_partitioner_dict) mock_hurdle_model.assert_not_called() - - - def test_train_model_artifact(stepshifter_manager, stepshifter_manager_hurdle): """ @@ -194,8 +203,6 @@ def test_train_model_artifact(stepshifter_manager, stepshifter_manager_hurdle): mock_split_hurdle.assert_called_once() - - def test_evaluate_model_artifact(stepshifter_manager): """ Test the _evaluate_model_artifact method to ensure it correctly evaluates the model artifact. @@ -233,8 +240,6 @@ def test_evaluate_model_artifact(stepshifter_manager): path_artifact = stepshifter_manager._model_path.artifacts / artifact_name assert path_artifact == Path("predictions_test_run_202401011200000/non_default_artifact.pkl") - - def test_forecast_model_artifact(stepshifter_manager): """ Test the _forecast_model_artifact method to ensure it correctly forecasts the model artifact. @@ -278,7 +283,6 @@ def test_forecast_model_artifact(stepshifter_manager): assert path_artifact == Path("predictions_test_run_202401011200000/non_default_artifact.pkl") mock_logger.exception.assert_called_once_with(f"Model artifact not found at {path_artifact}") - def test_evaluate_sweep(stepshifter_manager): """ Test the _evaluate_sweep method. @@ -297,7 +301,3 @@ def test_evaluate_sweep(stepshifter_manager): # mock_read_dataframe.assert_called_once() mock_model.predict.assert_called_once_with("test_run_type", eval_type) mock_get_standardized_df.assert_called_once() - - - - diff --git a/views_stepshifter/manager/stepshifter_manager.py b/views_stepshifter/manager/stepshifter_manager.py index 260f59a..1852284 100644 --- a/views_stepshifter/manager/stepshifter_manager.py +++ b/views_stepshifter/manager/stepshifter_manager.py @@ -1,28 +1,28 @@ -from views_pipeline_core.managers.model import ModelPathManager, ModelManager +from views_pipeline_core.managers.model import ModelPathManager, ForecastingModelManager from views_pipeline_core.configs.pipeline import PipelineConfig -from views_pipeline_core.files.utils import read_dataframe +from views_pipeline_core.files.utils import read_dataframe, generate_model_file_name from views_stepshifter.models.stepshifter import StepshifterModel from views_stepshifter.models.hurdle_model import HurdleModel +from views_stepshifter.models.shurf_model import ShurfModel import logging import pickle import pandas as pd import numpy as np from typing import Union, Optional, List, Dict -# from views_stepshifter.models.shurf import StepShiftedHurdleUncertainRF logger = logging.getLogger(__name__) -class StepshifterManager(ModelManager): +class StepshifterManager(ForecastingModelManager): def __init__( self, model_path: ModelPathManager, - wandb_notifications: bool = False, + wandb_notifications: bool = True, use_prediction_store: bool = True, ) -> None: super().__init__(model_path, wandb_notifications, use_prediction_store) self._is_hurdle = self._config_meta["algorithm"] == "HurdleModel" - self._is_shurf = self._config_meta["algorithm"] == "SHURF" + self._is_shurf = self._config_meta["algorithm"] == "ShurfModel" @staticmethod def _get_standardized_df(df: pd.DataFrame) -> pd.DataFrame: @@ -36,9 +36,16 @@ def _get_standardized_df(df: pd.DataFrame) -> pd.DataFrame: The standardized DataFrame """ - # post-process: replace inf and -inf with 0 - df = df.replace([np.inf, -np.inf], 0) - df = df.mask(df < 0, 0) + def standardize_value(value): + # 1) Replace inf, -inf, nan with 0; + # 2) Replace negative values with 0 + if isinstance(value, list): + return [0 if (v == np.inf or v == -np.inf or v < 0 or np.isnan(v)) else v for v in value] + else: + return 0 if (value == np.inf or value == -np.inf or value < 0 or np.isnan(value)) else value + + df = df.applymap(standardize_value) + return df def _split_hurdle_parameters(self): @@ -78,8 +85,8 @@ def _get_model(self, partitioner_dict: dict): """ if self._is_hurdle: model = HurdleModel(self.config, partitioner_dict) - # elif self._is_shurf: - # model = StepShiftedHurdleUncertainRF(self.config, partitioner_dict) + elif self._is_shurf: + model = ShurfModel(self.config, partitioner_dict) else: self.config["model_reg"] = self.config["algorithm"] model = StepshifterModel(self.config, partitioner_dict) @@ -96,7 +103,7 @@ def _train_model_artifact(self): path_raw = self._model_path.data_raw path_artifacts = self._model_path.artifacts # W&B does not directly support nested dictionaries for hyperparameters - if self.config["sweep"] and self._is_hurdle: + if self.config["sweep"] and (self._is_hurdle or self._is_shurf): self.config = self._split_hurdle_parameters() run_type = self.config["run_type"] @@ -109,7 +116,7 @@ def _train_model_artifact(self): stepshift_model.fit(df_viewser) if not self.config["sweep"]: - model_filename = ModelManager.generate_model_file_name( + model_filename = generate_model_file_name( run_type, file_extension=".pkl" ) stepshift_model.save(path_artifacts / model_filename) @@ -128,7 +135,6 @@ def _evaluate_model_artifact( Returns: A list of DataFrames containing the evaluation results """ - path_raw = self._model_path.data_raw path_artifacts = self._model_path.artifacts run_type = self.config["run_type"] @@ -157,10 +163,9 @@ def _evaluate_model_artifact( raise df_predictions = stepshift_model.predict(run_type, eval_type) - if not self._is_shurf: - df_predictions = [ - StepshifterManager._get_standardized_df(df) for df in df_predictions - ] + df_predictions = [ + StepshifterManager._get_standardized_df(df) for df in df_predictions + ] return df_predictions def _forecast_model_artifact(self, artifact_name: str) -> pd.DataFrame: @@ -173,7 +178,6 @@ def _forecast_model_artifact(self, artifact_name: str) -> pd.DataFrame: Returns: The forecasted DataFrame """ - path_raw = self._model_path.data_raw path_artifacts = self._model_path.artifacts run_type = self.config["run_type"] @@ -207,7 +211,6 @@ def _forecast_model_artifact(self, artifact_name: str) -> pd.DataFrame: return df_prediction def _evaluate_sweep(self, eval_type: str, model: any) -> List[pd.DataFrame]: - path_raw = self._model_path.data_raw run_type = self.config["run_type"] df_predictions = model.predict(run_type, eval_type) @@ -215,4 +218,4 @@ def _evaluate_sweep(self, eval_type: str, model: any) -> List[pd.DataFrame]: StepshifterManager._get_standardized_df(df) for df in df_predictions ] - return df_predictions + return df_predictions \ No newline at end of file diff --git a/views_stepshifter/models/hurdle_model.py b/views_stepshifter/models/hurdle_model.py index 639adf1..e8845d5 100644 --- a/views_stepshifter/models/hurdle_model.py +++ b/views_stepshifter/models/hurdle_model.py @@ -1,4 +1,4 @@ -from views_pipeline_core.managers.model import ModelManager +from views_pipeline_core.managers.model import ForecastingModelManager from views_stepshifter.models.stepshifter import StepshifterModel from views_stepshifter.models.validation import views_validate from sklearn.utils.validation import check_is_fitted @@ -7,8 +7,6 @@ import logging import tqdm from concurrent.futures import ProcessPoolExecutor, as_completed -# import multiprocessing -# multiprocessing.set_start_method('spawn') from functools import partial logger = logging.getLogger(__name__) @@ -149,29 +147,19 @@ def fit(self, df: pd.DataFrame): self._models = models self.is_fitted_ = True - # for step in tqdm.tqdm(self._steps, desc="Fitting model for step", leave=True): - # # Fit binary-like stage using a classification model, but the target is binary (0 or 1) - # binary_model = self._clf(lags_past_covariates=[-step], **self._clf_params) - # binary_model.fit(target_binary, past_covariates=self._past_cov) - - # # Fit positive stage using the regression model - # positive_model = self._reg(lags_past_covariates=[-step], **self._reg_params) - # positive_model.fit(target_pos, past_covariates=past_cov_pos) - # self._models[step] = (binary_model, positive_model) - # self.is_fitted_ = True - def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: check_is_fitted(self, "is_fitted_") if run_type != "forecasting": - final_preds = [] + if eval_type == "standard": total_sequence_number = ( - ModelManager._resolve_evaluation_sequence_number(eval_type) + ForecastingModelManager._resolve_evaluation_sequence_number(eval_type) ) if self.get_device_params().get("device") == "cuda": + pred = [] for sequence_number in tqdm.tqdm( - range(ModelManager._resolve_evaluation_sequence_number(eval_type)), + range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)), desc="Predicting for sequence number", ): pred_by_step_binary = [ @@ -186,15 +174,15 @@ def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: ) for step in self._steps ] - final_pred = pd.concat(pred_by_step_binary, axis=0) * pd.concat(pred_by_step_positive, axis=0) - final_preds.append(final_pred) - return final_preds + pred = pd.concat(pred_by_step_binary, axis=0) * pd.concat(pred_by_step_positive, axis=0) + preds.append(pred) + else: preds = [None] * total_sequence_number with ProcessPoolExecutor() as executor: futures = { executor.submit(self._predict_by_sequence, sequence_number): sequence_number - for sequence_number in range(ModelManager._resolve_evaluation_sequence_number(eval_type)) + for sequence_number in range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)) } for future in tqdm.tqdm( as_completed(futures.keys()), @@ -203,7 +191,10 @@ def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: ): sequence_number = futures[future] preds[sequence_number] = future.result() - return preds + else: + raise ValueError( + f"{eval_type} is not supported now. Please use 'standard' evaluation type." + ) else: if self.get_device_params().get("device") == "cuda": @@ -217,10 +208,10 @@ def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: self._predict_by_step(self._models[step][1], step, 0) ) - final_preds = pd.concat(pred_by_step_binary, axis=0) * pd.concat( + preds = pd.concat(pred_by_step_binary, axis=0) * pd.concat( pred_by_step_positive, axis=0 ) - return final_preds + else: with ProcessPoolExecutor() as executor: futures_binary = { @@ -257,4 +248,4 @@ def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: pd.concat(pred_by_step_binary, axis=0).sort_index() * pd.concat(pred_by_step_positive, axis=0).sort_index() ) - return preds + return preds diff --git a/views_stepshifter/models/shurf_model.py b/views_stepshifter/models/shurf_model.py new file mode 100644 index 0000000..1dc7af4 --- /dev/null +++ b/views_stepshifter/models/shurf_model.py @@ -0,0 +1,289 @@ +from views_pipeline_core.managers.model import ForecastingModelManager +from views_stepshifter.models.hurdle_model import HurdleModel +from views_stepshifter.models.validation import views_validate +from sklearn.utils.validation import check_is_fitted +import pandas as pd +from typing import List, Dict +import numpy as np +import logging +from tqdm import tqdm + +logger = logging.getLogger(__name__) + + +class ShurfModel(HurdleModel): + def __init__(self, config: Dict, partitioner_dict: Dict[str, List[int]]): + super().__init__(config, partitioner_dict) + self._clf_params = self._get_parameters(config)["clf"] + self._reg_params = self._get_parameters(config)["reg"] + + self._submodel_list = [] + self._submodels_to_train = config["submodels_to_train"] + self._log_target = config["log_target"] + self._pred_samples = config["pred_samples"] + self._draw_dist = config["draw_dist"] + self._draw_sigma = config["draw_sigma"] + + # self._n_estimators = config['parameters']['n_estimators'] + # self._max_features = config['max_features'] + # self._max_depth = config['max_depth'] + # self._max_samples = config['max_samples'] + # self._geo_unit_samples = config['geo_unit_samples'] + # self._n_jobs = config['n_jobs'] + + @views_validate + def fit(self, df: pd.DataFrame): + """ + Generate predictions using the trained submodels. + This method performs the following steps: + 1. Prepares the data for classification and regression stages. + 2. Iterates over each submodel to generate predictions: + - Predicts probabilities using the classification model. + - Predicts target values using the regression model. + - Handles infinite values in predictions. + 3. Draws samples from the distributions: + - For each prediction sample, combines classification and regression predictions. + - Applies binomial, Poisson, or lognormal distributions to generate final predictions. + 4. Aggregates the predictions from all submodels into a final DataFrame. + Returns: + pd.DataFrame: A DataFrame containing the final set of predictions with indices set to 'draw'. + """ + df = self._process_data(df) + self._prepare_time_series(df) + self._clf = self._resolve_clf_model(self._config["model_clf"]) + self._reg = self._resolve_reg_model(self._config["model_reg"]) + + target_binary = [ + s.map(lambda x: (x > 0).astype(float)) for s in self._target_train + ] + + target_pos, past_cov_pos = zip( + *[ + (t, p) + for t, p in zip(self._target_train, self._past_cov) + if (t.values() > 0).any() + ] + ) + + for i in tqdm(range(self._submodels_to_train), desc="Training submodel"): + + for step in tqdm(self._steps, desc=f"Steps for submodel {i+1}"): + # Fit binary-like stage using a regression model, but the target is binary (0 or 1) + binary_model = self._clf( + lags_past_covariates=[-step], **self._clf_params + ) + binary_model.fit(target_binary, past_covariates=self._past_cov) + + # Fit positive stage using the regression model + positive_model = self._reg( + lags_past_covariates=[-step], **self._reg_params + ) + positive_model.fit(target_pos, past_covariates=past_cov_pos) + self._models[step] = (binary_model, positive_model) + + self._submodel_list.append(self._models) + + self.is_fitted_ = True + + def predict_sequence(self, sequence_number) -> pd.DataFrame: + """ + Predicts n draws of outcomes based on the provided DataFrame . + + Parameters: + ----------- + self: StepShiftedHurdleUncertainRF + The model object. + + run_type : str + The type of run to perform. Currently it is unlikely to affect the behaviour of the function. + + eval_type : str + The type of evaluation to perform. Currently it is unlikely to affect the behaviour of the function. + + sequence_number : int + The sequence number to predict outcomes for. + + + Returns: + -------- + pd.DataFrame + The final predictions as a DataFrame. + """ + + final_preds = [] + submodel_number = 0 + + for submodel in tqdm( + self._submodel_list, desc=f"Predicting submodel number", leave=True + ): + pred_by_step_binary = [ + self._predict_by_step(submodel[step][0], step, sequence_number) + for step in self._steps + ] + pred_by_step_positive = [ + self._predict_by_step(submodel[step][1], step, sequence_number) + for step in self._steps + ] + + pred_concat_binary = pd.concat(pred_by_step_binary, axis=0) + + pred_concat_binary.rename( + columns={f"pred_{self._targets}": "Classification"}, inplace=True + ) + pred_concat_positive = pd.concat(pred_by_step_positive, axis=0) + pred_concat_positive.rename( + columns={f"pred_{self._targets}": "Regression"}, inplace=True + ) + pred_concat = pd.concat([pred_concat_binary, pred_concat_positive], axis=1) + pred_concat["submodel"] = submodel_number + + final_preds.append(pred_concat) + submodel_number += 1 + + final_preds_aslists = pd.concat(final_preds, axis=0) + + # Drawing samples from the classification model + # Ensuring that the classification probabilities are between 0 and 1: + final_preds_aslists["Classification"] = final_preds_aslists[ + "Classification" + ].apply(lambda x: np.clip(x, 0, 1)) + final_preds_aslists["ClassificationSample"] = final_preds_aslists[ + "Classification" + ].apply(lambda x: np.random.binomial(1, x, self._pred_samples)) + + # Drawing samples from the regression model + if self._log_target == True: + if ( + self._draw_dist == "Poisson" + ): # Note: the Poisson distribution assumes a non-log-transformed target, so not defined here + final_preds_aslists["RegressionSample"] = final_preds_aslists[ + "Regression" + ] + if self._draw_dist == "Lognormal": + # Draw from normal distribution for log-transformed outcomes, then exponentiate, then round to integer + final_preds_aslists["RegressionSample"] = final_preds_aslists[ + "Regression" + ].apply( + lambda x: np.abs( + np.rint( + np.expm1( + np.random.normal( + x, self._draw_sigma, self._pred_samples + ) + ) + ) + ) + ) + + if self._log_target == False: + if ( + self._draw_dist == "Poisson" + ): # Note: this assumes a non-log-transformed target + final_preds_aslists["RegressionSample"] = final_preds_aslists[ + "Regression" + ] + if self._draw_dist == "Lognormal": + final_preds_aslists["RegressionSample"] = final_preds_aslists[ + "Regression" + ].apply( + lambda x: np.abs( + np.rint( + np.expm1( + np.random.normal( + np.log1p(x), self._draw_sigma, self._pred_samples + ) + ) + ) + ) + ) + + if self._draw_dist == "": + final_preds_aslists["RegressionSample"] = final_preds_aslists["Regression"] + + # 'Explode' the samples to get one row per sample + final_preds_full = final_preds_aslists.explode( + ["ClassificationSample", "RegressionSample"] + ) + final_preds_full["Prediction"] = ( + final_preds_full["ClassificationSample"] + * final_preds_full["RegressionSample"] + ) + + # Ensuring that the final predictions are positive: + final_preds_full["Prediction"] = final_preds_full["Prediction"].apply( + lambda x: np.clip(x, 0, None) + ) + + # Column for the main prediction: + pred_col_name = "pred_" + self._targets + final_preds_full[pred_col_name] = final_preds_full["Prediction"] + + # Log-transforming the final predictions if the target is log-transformed, exponentiating if not, + # and adding a column with the log-transformed predictions + if self._log_target == True: + final_preds_full["LogPrediction"] = final_preds_full["Prediction"] + final_preds_full["Prediction"] = np.expm1(final_preds_full["Prediction"]) + if self._log_target == False: + final_preds_full["LogPrediction"] = np.log1p(final_preds_full["Prediction"]) + + final_preds_full.drop( + columns=[ + "Classification", + "Regression", + "ClassificationSample", + "RegressionSample", + "submodel", + "Prediction", + "LogPrediction", + ], + inplace=True, + ) + final_preds = pd.DataFrame( + final_preds_full.groupby(["month_id", "country_id"])[pred_col_name].apply( + list + ) + ) + + return final_preds + + def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: + """ + Predicts outcomes based on the provided DataFrame and run type. + + Parameters: + ----------- + df : pd.DataFrame + The input data for making predictions. + run_type : str + The type of run to perform. If 'forecasting', a single prediction is made. + Otherwise, multiple predictions are made based on the evaluation sequence number. + eval_type : str, optional + The type of evaluation to perform. Default is "standard". + + Returns: + -------- + pd.DataFrame + The final predictions as a DataFrame. + """ + check_is_fitted(self, "is_fitted_") + + if run_type != "forecasting": + preds = [] + if eval_type == "standard": + for sequence_number in tqdm( + range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)), + desc=f"Predicting for sequence number", + leave=True, + ): + temp_preds_full = self.predict_sequence(sequence_number) + preds.append(temp_preds_full) + else: + raise ValueError( + f"{eval_type} is not supported now. Please use 'standard' evaluation type." + ) + + else: + sequence_number = 0 + preds = self.predict_sequence(sequence_number) + + return preds diff --git a/views_stepshifter/models/stepshifter.py b/views_stepshifter/models/stepshifter.py index 2d42e5a..264d97b 100644 --- a/views_stepshifter/models/stepshifter.py +++ b/views_stepshifter/models/stepshifter.py @@ -6,7 +6,7 @@ from sklearn.utils.validation import check_is_fitted from typing import List, Dict from views_stepshifter.models.validation import views_validate -from views_pipeline_core.managers.model import ModelManager +from views_pipeline_core.managers.model import ModelManager, ForecastingModelManager import tqdm from concurrent.futures import ProcessPoolExecutor, as_completed import torch @@ -237,26 +237,14 @@ def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: if run_type != "forecasting": if eval_type == "standard": - # preds = [] - # for sequence_number in tqdm.tqdm( - # range(ModelManager._resolve_evaluation_sequence_number(eval_type)), - # desc="Predicting for sequence number", - # ): - # pred_by_step = [ - # self._predict_by_step(self._models[step], step, sequence_number) - # for step in self._steps - # ] - # pred = pd.concat(pred_by_step, axis=0) - # preds.append(pred) - total_sequence_number = ( - ModelManager._resolve_evaluation_sequence_number(eval_type) + ForecastingModelManager._resolve_evaluation_sequence_number(eval_type) ) if self.get_device_params().get("device") == "cuda": preds = [] for sequence_number in tqdm.tqdm( - range(ModelManager._resolve_evaluation_sequence_number(eval_type)), + range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)), desc="Predicting for sequence number", ): pred_by_step = [ @@ -282,18 +270,19 @@ def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: ): sequence_number = futures[future] preds[sequence_number] = future.result() + else: + raise ValueError( + f"{eval_type} is not supported now. Please use 'standard' evaluation type." + ) else: - # preds = [] - # for step in tqdm.tqdm(self._steps, desc="Predicting for steps"): - # preds.append(self._predict_by_step(self._models[step], step, 0)) - # preds = pd.concat(preds, axis=0).sort_index() if self.get_device_params().get("device") == "cuda": preds = [] for step in tqdm.tqdm(self._steps, desc="Predicting for steps"): preds.append(self._predict_by_step(self._models[step], step, 0)) preds = pd.concat(preds, axis=0).sort_index() + else: with ProcessPoolExecutor() as executor: futures = {