From 67f0f62f529e94967dfe0c024381ddaa29bb9902 Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Fri, 7 Nov 2025 10:30:34 +0100 Subject: [PATCH 01/14] change based on vpc cleanup --- .../manager/stepshifter_manager.py | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/views_stepshifter/manager/stepshifter_manager.py b/views_stepshifter/manager/stepshifter_manager.py index dd75270..dcdf16f 100644 --- a/views_stepshifter/manager/stepshifter_manager.py +++ b/views_stepshifter/manager/stepshifter_manager.py @@ -58,7 +58,7 @@ def _split_hurdle_parameters(self): """ clf_dict = {} reg_dict = {} - config = self.config + config = self.configs for key, value in config.items(): if key.startswith("clf_"): @@ -84,12 +84,12 @@ def _get_model(self, partitioner_dict: dict): The model object based on the algorithm specified in the config """ if self._is_hurdle: - model = HurdleModel(self.config, partitioner_dict) + model = HurdleModel(self.configs, partitioner_dict) elif self._is_shurf: - model = ShurfModel(self.config, partitioner_dict) + model = ShurfModel(self.configs, partitioner_dict) else: - self.config["model_reg"] = self.config["algorithm"] - model = StepshifterModel(self.config, partitioner_dict) + self.configs = {"model_reg": self.configs["algorithm"]} + model = StepshifterModel(self.configs, partitioner_dict) return model @@ -103,10 +103,10 @@ def _train_model_artifact(self): path_raw = self._model_path.data_raw path_artifacts = self._model_path.artifacts # W&B does not directly support nested dictionaries for hyperparameters - if self.config["sweep"] and (self._is_hurdle or self._is_shurf): - self.config = self._split_hurdle_parameters() + if self.configs["sweep"] and (self._is_hurdle or self._is_shurf): + self.configs = self._split_hurdle_parameters() - run_type = self.config["run_type"] + run_type = self.configs["run_type"] df_viewser = read_dataframe( path_raw / f"{run_type}_viewser_df{PipelineConfig.dataframe_format}" ) @@ -115,7 +115,7 @@ def _train_model_artifact(self): stepshift_model = self._get_model(partitioner_dict) stepshift_model.fit(df_viewser) - if not self.config["sweep"]: + if not self.configs["sweep"]: model_filename = generate_model_file_name( run_type, file_extension=".pkl" ) @@ -136,7 +136,7 @@ def _evaluate_model_artifact( A list of DataFrames containing the evaluation results """ path_artifacts = self._model_path.artifacts - run_type = self.config["run_type"] + run_type = self.configs["run_type"] # if an artifact name is provided through the CLI, use it. # Otherwise, get the latest model artifact based on the run type @@ -153,7 +153,7 @@ def _evaluate_model_artifact( ) path_artifact = self._model_path.get_latest_model_artifact_path(run_type) - self.config["timestamp"] = path_artifact.stem[-15:] + self.configs = {"timestamp": path_artifact.stem[-15:]} try: with open(path_artifact, "rb") as f: @@ -179,7 +179,7 @@ def _forecast_model_artifact(self, artifact_name: str) -> pd.DataFrame: The forecasted DataFrame """ path_artifacts = self._model_path.artifacts - run_type = self.config["run_type"] + run_type = self.configs["run_type"] # if an artifact name is provided through the CLI, use it. # Otherwise, get the latest model artifact based on the run type @@ -196,7 +196,7 @@ def _forecast_model_artifact(self, artifact_name: str) -> pd.DataFrame: ) path_artifact = self._model_path.get_latest_model_artifact_path(run_type) - self.config["timestamp"] = path_artifact.stem[-15:] + self.configs = {"timestamp": path_artifact.stem[-15:]} try: with open(path_artifact, "rb") as f: @@ -211,7 +211,7 @@ def _forecast_model_artifact(self, artifact_name: str) -> pd.DataFrame: return df_prediction def _evaluate_sweep(self, eval_type: str, model: any) -> List[pd.DataFrame]: - run_type = self.config["run_type"] + run_type = self.configs["run_type"] df_predictions = model.predict(run_type, eval_type) df_predictions = [ From eb731e86f964499a877c9e2f7b09287a5ce9900d Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Thu, 13 Nov 2025 16:00:22 +0100 Subject: [PATCH 02/14] add threadpool --- views_stepshifter/models/hurdle_model.py | 306 ++++++++++++----------- views_stepshifter/models/stepshifter.py | 289 +++++++++++++-------- 2 files changed, 348 insertions(+), 247 deletions(-) diff --git a/views_stepshifter/models/hurdle_model.py b/views_stepshifter/models/hurdle_model.py index bf60c40..e358f74 100644 --- a/views_stepshifter/models/hurdle_model.py +++ b/views_stepshifter/models/hurdle_model.py @@ -70,53 +70,46 @@ def _resolve_clf_model(self, func_name: str): f"Change the model in the config file." ) - def _fit_by_step(self, step, target_binary, target_pos, past_cov_pos): + def _fit_by_step(self, step): # Fit binary-like stage using a classification model binary_model = self._clf(lags_past_covariates=[-step], **self._clf_params) - binary_model.fit(target_binary, past_covariates=self._past_cov) + binary_model.fit(self._target_binary, past_covariates=self._past_cov) # Fit positive stage using the regression model positive_model = self._reg(lags_past_covariates=[-step], **self._reg_params) - positive_model.fit(target_pos, past_covariates=past_cov_pos) + positive_model.fit(self._target_pos, past_covariates=self._past_cov_pos) return (binary_model, positive_model) + def _predict_by_step(self, step: int, sequence_number: int=0): + # OVERRIDE: Help function for parallel execution + binary_model, positive_model = self._models[step] + pred_binary = self._predict(binary_model, step, sequence_number) + pred_positive = self._predict(positive_model, step, sequence_number) + return pred_binary.sort_index() * pred_positive.sort_index() + def _predict_by_sequence(self, sequence_number): - pred_by_step_binary = [] - pred_by_step_positive = [] - + pred_by_step = [] for step in self._steps: - # Predict for binary model - pred_binary = self._predict_by_step(self._models[step][0], step, sequence_number) - pred_by_step_binary.append(pred_binary) - - # Predict for positive model - pred_positive = self._predict_by_step(self._models[step][1], step, sequence_number) - pred_by_step_positive.append(pred_positive) - - final_pred = ( - pd.concat(pred_by_step_binary, axis=0).sort_index() * - pd.concat(pred_by_step_positive, axis=0).sort_index() - ) + pred = self._predict_by_step(step, sequence_number) + pred_by_step.append(pred) + return pd.concat(pred_by_step, axis=0).sort_index() - return final_pred - @views_validate def fit(self, df: pd.DataFrame): df = self._process_data(df) self._prepare_time_series(df) self._clf = self._resolve_clf_model(self._config["model_clf"]) self._reg = self._resolve_reg_model(self._config["model_reg"]) - # Binary outcome (event/no-event) # According to the DARTS doc, if timeseries uses a numeric type different from np.float32 or np.float64, not all functionalities may work properly. # So use astype(float) instead of astype(int) (we should have binary outputs 0,1 though) - target_binary = [ + self._target_binary = [ s.map(lambda x: (x > 0).astype(float)) for s in self._target_train ] # Positive outcome (for cases where target > 0) - target_pos, past_cov_pos = zip( + self._target_pos, self._past_cov_pos = zip( *[ (t, p) for t, p in zip(self._target_train, self._past_cov) @@ -124,128 +117,159 @@ def fit(self, df: pd.DataFrame): ] ) - if self.get_device_params().get("device") == "cuda": - for step in tqdm.tqdm(self._steps, desc="Fitting model for step", leave=True): - # Fit binary-like stage using a classification model, but the target is binary (0 or 1) - binary_model = self._clf(lags_past_covariates=[-step], **self._clf_params) - binary_model.fit(target_binary, past_covariates=self._past_cov) - - # Fit positive stage using the regression model - positive_model = self._reg(lags_past_covariates=[-step], **self._reg_params) - positive_model.fit(target_pos, past_covariates=past_cov_pos) - self._models[step] = (binary_model, positive_model) - else: - models = {} - with ProcessPoolExecutor() as executor: - futures = { - executor.submit(self._fit_by_step, step, target_binary, target_pos, past_cov_pos): step - for step in self._steps - } - for future in tqdm.tqdm(as_completed(futures.keys()), desc="Fitting models for steps", total=len(futures)): - step = futures[future] - models[step] = future.result() - self._models = models + model_list = self._execute_parallel( + func=self._fit_by_step, + items=self._steps, + desc="Fitting models for steps" + ) + self._models = {step: model for step, model in zip(self._steps, model_list)} self.is_fitted_ = True - def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: - check_is_fitted(self, "is_fitted_") + # @views_validate + # def fit(self, df: pd.DataFrame): + # df = self._process_data(df) + # self._prepare_time_series(df) + # self._clf = self._resolve_clf_model(self._config["model_clf"]) + # self._reg = self._resolve_reg_model(self._config["model_reg"]) - if run_type != "forecasting": + # # Binary outcome (event/no-event) + # # According to the DARTS doc, if timeseries uses a numeric type different from np.float32 or np.float64, not all functionalities may work properly. + # # So use astype(float) instead of astype(int) (we should have binary outputs 0,1 though) + # target_binary = [ + # s.map(lambda x: (x > 0).astype(float)) for s in self._target_train + # ] - if eval_type == "standard": - total_sequence_number = ( - ForecastingModelManager._resolve_evaluation_sequence_number(eval_type) - ) - if self.get_device_params().get("device") == "cuda": - preds = [] - for sequence_number in tqdm.tqdm( - range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)), - desc="Predicting for sequence number", - ): - pred_by_step_binary = [ - self._predict_by_step( - self._models[step][0], step, sequence_number - ) - for step in self._steps - ] - pred_by_step_positive = [ - self._predict_by_step( - self._models[step][1], step, sequence_number - ) - for step in self._steps - ] - pred = pd.concat(pred_by_step_binary, axis=0) * pd.concat(pred_by_step_positive, axis=0) - preds.append(pred) - - else: - preds = [None] * total_sequence_number - with ProcessPoolExecutor() as executor: - futures = { - executor.submit(self._predict_by_sequence, sequence_number): sequence_number - for sequence_number in range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)) - } - for future in tqdm.tqdm( - as_completed(futures.keys()), - desc="Predicting for sequence number", - total=len(futures), - ): - sequence_number = futures[future] - preds[sequence_number] = future.result() - else: - raise ValueError( - f"{eval_type} is not supported now. Please use 'standard' evaluation type." - ) + # # Positive outcome (for cases where target > 0) + # target_pos, past_cov_pos = zip( + # *[ + # (t, p) + # for t, p in zip(self._target_train, self._past_cov) + # if (t.values() > 0).any() + # ] + # ) - else: - if self.get_device_params().get("device") == "cuda": - pred_by_step_binary = [] - pred_by_step_positive = [] - for step in tqdm.tqdm(self._steps, desc="Predicting for step", total=len(self._steps)): - pred_by_step_binary.append( - self._predict_by_step(self._models[step][0], step, 0) - ) - pred_by_step_positive.append( - self._predict_by_step(self._models[step][1], step, 0) - ) + # if self.get_device_params().get("device") == "cuda": + # for step in tqdm.tqdm(self._steps, desc="Fitting model for step", leave=True): + # # Fit binary-like stage using a classification model, but the target is binary (0 or 1) + # binary_model = self._clf(lags_past_covariates=[-step], **self._clf_params) + # binary_model.fit(target_binary, past_covariates=self._past_cov) + + # # Fit positive stage using the regression model + # positive_model = self._reg(lags_past_covariates=[-step], **self._reg_params) + # positive_model.fit(target_pos, past_covariates=past_cov_pos) + # self._models[step] = (binary_model, positive_model) + # else: + # models = {} + # with ProcessPoolExecutor() as executor: + # futures = { + # executor.submit(self._fit_by_step, step, target_binary, target_pos, past_cov_pos): step + # for step in self._steps + # } + # for future in tqdm.tqdm(as_completed(futures.keys()), desc="Fitting models for steps", total=len(futures)): + # step = futures[future] + # models[step] = future.result() + # self._models = models + # self.is_fitted_ = True + + # def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: + # check_is_fitted(self, "is_fitted_") + + # if run_type != "forecasting": + + # if eval_type == "standard": + # total_sequence_number = ( + # ForecastingModelManager._resolve_evaluation_sequence_number(eval_type) + # ) + # if self.get_device_params().get("device") == "cuda": + # preds = [] + # for sequence_number in tqdm.tqdm( + # range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)), + # desc="Predicting for sequence number", + # ): + # pred_by_step_binary = [ + # self._predict_by_step( + # self._models[step][0], step, sequence_number + # ) + # for step in self._steps + # ] + # pred_by_step_positive = [ + # self._predict_by_step( + # self._models[step][1], step, sequence_number + # ) + # for step in self._steps + # ] + # pred = pd.concat(pred_by_step_binary, axis=0) * pd.concat(pred_by_step_positive, axis=0) + # preds.append(pred) + + # else: + # preds = [None] * total_sequence_number + # with ProcessPoolExecutor() as executor: + # futures = { + # executor.submit(self._predict_by_sequence, sequence_number): sequence_number + # for sequence_number in range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)) + # } + # for future in tqdm.tqdm( + # as_completed(futures.keys()), + # desc="Predicting for sequence number", + # total=len(futures), + # ): + # sequence_number = futures[future] + # preds[sequence_number] = future.result() + # else: + # raise ValueError( + # f"{eval_type} is not supported now. Please use 'standard' evaluation type." + # ) + + # else: + # if self.get_device_params().get("device") == "cuda": + # pred_by_step_binary = [] + # pred_by_step_positive = [] + # for step in tqdm.tqdm(self._steps, desc="Predicting for step", total=len(self._steps)): + # pred_by_step_binary.append( + # self._predict_by_step(self._models[step][0], step, 0) + # ) + # pred_by_step_positive.append( + # self._predict_by_step(self._models[step][1], step, 0) + # ) - preds = pd.concat(pred_by_step_binary, axis=0) * pd.concat( - pred_by_step_positive, axis=0 - ) + # preds = pd.concat(pred_by_step_binary, axis=0) * pd.concat( + # pred_by_step_positive, axis=0 + # ) - else: - with ProcessPoolExecutor() as executor: - futures_binary = { - step: executor.submit( - self._predict_by_step, self._models[step][0], step, 0 - ) - for step in self._steps - } - futures_positive = { - step: executor.submit( - self._predict_by_step, self._models[step][1], step, 0 - ) - for step in self._steps - } - - pred_by_step_binary = [ - future.result() - for future in tqdm.tqdm( - as_completed(futures_binary.values()), - desc="Predicting binary outcomes", - total=len(futures_binary), - ) - ] - pred_by_step_positive = [ - future.result() - for future in tqdm.tqdm( - as_completed(futures_positive.values()), - desc="Predicting positive outcomes", - total=len(futures_positive), - ) - ] - - preds = ( - pd.concat(pred_by_step_binary, axis=0).sort_index() - * pd.concat(pred_by_step_positive, axis=0).sort_index() - ) - return preds + # else: + # with ProcessPoolExecutor() as executor: + # futures_binary = { + # step: executor.submit( + # self._predict_by_step, self._models[step][0], step, 0 + # ) + # for step in self._steps + # } + # futures_positive = { + # step: executor.submit( + # self._predict_by_step, self._models[step][1], step, 0 + # ) + # for step in self._steps + # } + + # pred_by_step_binary = [ + # future.result() + # for future in tqdm.tqdm( + # as_completed(futures_binary.values()), + # desc="Predicting binary outcomes", + # total=len(futures_binary), + # ) + # ] + # pred_by_step_positive = [ + # future.result() + # for future in tqdm.tqdm( + # as_completed(futures_positive.values()), + # desc="Predicting positive outcomes", + # total=len(futures_positive), + # ) + # ] + + # preds = ( + # pd.concat(pred_by_step_binary, axis=0).sort_index() + # * pd.concat(pred_by_step_positive, axis=0).sort_index() + # ) + # return preds diff --git a/views_stepshifter/models/stepshifter.py b/views_stepshifter/models/stepshifter.py index 264d97b..67afe8b 100644 --- a/views_stepshifter/models/stepshifter.py +++ b/views_stepshifter/models/stepshifter.py @@ -4,11 +4,11 @@ import logging from darts import TimeSeries from sklearn.utils.validation import check_is_fitted -from typing import List, Dict +from typing import List, Dict, Callable from views_stepshifter.models.validation import views_validate from views_pipeline_core.managers.model import ModelManager, ForecastingModelManager import tqdm -from concurrent.futures import ProcessPoolExecutor, as_completed +from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor import torch from functools import partial @@ -41,6 +41,46 @@ def get_device_params(): else: return {} + def _execute_parallel(self, func: Callable, items: List, desc: str, max_workers: int=None): + """ + Executes a function in parallel, automatically choosing + ThreadPoolExecutor for CUDA or ProcessPoolExecutor for CPU. + + Args: + func: The function to execute. + items: A list of items to submit to the function. + desc: The description for the tqdm progress bar. + max_workers: The max_workers for the ThreadPoolExecutor (for CUDA). + ProcessPoolExecutor will use its default. + Returns: + A list of results, in the order the items were submitted. + """ + if self.get_device_params().get("device") == "cuda": + ExecutorClass = ThreadPoolExecutor + mw = max_workers if max_workers is not None else len(items) + executor_kwargs = {"max_workers": mw} + desc_prefix = "GPU" + else: + ExecutorClass = ProcessPoolExecutor + executor_kwargs = {} + desc_prefix = "CPU" + + # Store results by their original index to return them in order + futures = {} + results = [None] * len(items) + + with ExecutorClass(**executor_kwargs) as executor: + for index, item in enumerate(items): + future = executor.submit(func, item) + futures[future] = index + + full_desc = f"{desc_prefix}: {desc}" + for future in tqdm.tqdm(as_completed(futures.keys()), desc=full_desc, total=len(futures)): + index = futures[future] + results[index] = future.result() + + return results + def _resolve_reg_model(self, func_name: str): """ Lookup table for supported regression models @@ -50,6 +90,7 @@ def _resolve_reg_model(self, func_name: str): match func_name: case "XGBRFRegressor": from views_stepshifter.models.darts_model import XGBRFModel + if self.get_device_params().get("device") == "cuda": logger.info("\033[92mUsing CUDA for XGBRFRegressor\033[0m") cuda_params = {"tree_method": "hist", "device": "cuda"} @@ -57,6 +98,7 @@ def _resolve_reg_model(self, func_name: str): return XGBRFModel case "XGBRegressor": from darts.models import XGBModel + if self.get_device_params().get("device") == "cuda": logger.info("\033[92mUsing CUDA for XGBRegressor\033[0m") cuda_params = {"tree_method": "hist", "device": "cuda"} @@ -64,10 +106,11 @@ def _resolve_reg_model(self, func_name: str): return XGBModel case "LGBMRegressor": from darts.models import LightGBMModel - # if self.get_device_params().get("device") == "cuda": - # logger.info("\033[92mUsing CUDA for LGBMRegressor\033[0m") - # cuda_params = {"device": "cuda"} - # return partial(LightGBMModel, **cuda_params) + + if self.get_device_params().get("device") == "cuda": + logger.info("\033[92mUsing CUDA for LGBMRegressor\033[0m") + cuda_params = {"device": "cuda"} + return partial(LightGBMModel, **cuda_params) return LightGBMModel case "RandomForestRegressor": from darts.models import RandomForest @@ -143,7 +186,7 @@ def _fit_by_step(self, step): model.fit(self._target_train, past_covariates=self._past_cov) return model - def _predict_by_step(self, model, step: int, sequence_number: int): + def _predict(self, model, step: int, sequence_number: int=0): """ Keep predictions with last-month-with-data, i.e., diagonal prediction """ @@ -181,128 +224,162 @@ def _predict_by_step(self, model, step: int, sequence_number: int): ) return df_preds.sort_index() + + def _predict_by_step(self, step: int, sequence_number: int=0): + # Help function for parallel execution + return self._predict(self._models[step], step, sequence_number) def _predict_by_sequence(self, sequence_number): pred_by_step = [] for step in self._steps: - pred = self._predict_by_step(self._models[step], step, sequence_number) + pred = self._predict_by_step(step, sequence_number) pred_by_step.append(pred) return pd.concat(pred_by_step, axis=0).sort_index() - - # @views_validate - # def fit(self, df: pd.DataFrame): - # df = self._process_data(df) - # self._prepare_time_series(df) - # self._reg = self._resolve_reg_model(self._config["model_reg"]) - # for step in tqdm.tqdm( - # self._steps, desc="Fitting model for step", leave=True - # ): - # model = self._reg(lags_past_covariates=[-step], **self._reg_params) - # model.fit(self._target_train, - # past_covariates=self._past_cov) # Darts will automatically ignore the parts of past_covariates that go beyond the training period - # self._models[step] = model - # self.is_fitted_ = True - + @views_validate def fit(self, df: pd.DataFrame): df = self._process_data(df) self._prepare_time_series(df) self._reg = self._resolve_reg_model(self._config["model_reg"]) - models = {} - if self.get_device_params().get("device") == "cuda": - for step in tqdm.tqdm( - self._steps, desc="Fitting model for step", leave=True - ): - model = self._reg(lags_past_covariates=[-step], **self._reg_params) - model.fit(self._target_train, - past_covariates=self._past_cov) # Darts will automatically ignore the parts of past_covariates that go beyond the training period - self._models[step] = model - else: - with ProcessPoolExecutor() as executor: - futures = { - executor.submit(self._fit_by_step, step): step for step in self._steps - } - for future in tqdm.tqdm( - futures.keys(), desc="Fitting models for steps", total=len(futures) - ): - step = futures[future] - models[step] = future.result() - self._models = models # Use local variable to avoid concurrent execution issues + model_list = self._execute_parallel( + func=self._fit_by_step, + items=self._steps, + desc="Fitting models for steps" + ) + + self._models = {step: model for step, model in zip(self._steps, model_list)} self.is_fitted_ = True def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: check_is_fitted(self, "is_fitted_") if run_type != "forecasting": - - if eval_type == "standard": - total_sequence_number = ( - ForecastingModelManager._resolve_evaluation_sequence_number(eval_type) + total_sequence_number = ( + ForecastingModelManager._resolve_evaluation_sequence_number( + eval_type ) + ) + + sequence_numbers = list(range(total_sequence_number)) + preds = self._execute_parallel( + func=self._predict_by_sequence, + items=sequence_numbers, + desc="Predicting for sequence number" + ) + + else: + preds_by_step = self._execute_parallel( + func=self._predict_by_step, + items=self._steps, + desc="Predicting outcomes" + ) + + preds = pd.concat(preds_by_step, axis=0).sort_index() - if self.get_device_params().get("device") == "cuda": - preds = [] - for sequence_number in tqdm.tqdm( - range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)), - desc="Predicting for sequence number", - ): - pred_by_step = [ - self._predict_by_step(self._models[step], step, sequence_number) - for step in self._steps - ] - pred = pd.concat(pred_by_step, axis=0) - preds.append(pred) - else: - preds = [None] * total_sequence_number - with ProcessPoolExecutor() as executor: - futures = { - executor.submit( - self._predict_by_sequence, sequence_number - ): sequence_number - for sequence_number in range(total_sequence_number) - } - - for future in tqdm.tqdm( - as_completed(futures.keys()), - desc="Predicting for sequence number", - total=len(futures), - ): - sequence_number = futures[future] - preds[sequence_number] = future.result() - else: - raise ValueError( - f"{eval_type} is not supported now. Please use 'standard' evaluation type." - ) + return preds + - else: + # @views_validate + # def fit(self, df: pd.DataFrame): + # df = self._process_data(df) + # self._prepare_time_series(df) + # self._reg = self._resolve_reg_model(self._config["model_reg"]) - if self.get_device_params().get("device") == "cuda": - preds = [] - for step in tqdm.tqdm(self._steps, desc="Predicting for steps"): - preds.append(self._predict_by_step(self._models[step], step, 0)) - preds = pd.concat(preds, axis=0).sort_index() - - else: - with ProcessPoolExecutor() as executor: - futures = { - step: executor.submit( - self._predict_by_step, self._models[step], step, 0 - ) - for step in self._steps - } - preds_by_step = [ - future.result() - for future in tqdm.tqdm( - as_completed(futures.values()), - desc="Predicting outcomes", - total=len(futures), - ) - ] - - preds = pd.concat(preds_by_step, axis=0).sort_index() + # models = {} + # if self.get_device_params().get("device") == "cuda": + # for step in tqdm.tqdm( + # self._steps, desc="Fitting model for step", leave=True + # ): + # model = self._reg(lags_past_covariates=[-step], **self._reg_params) + # model.fit(self._target_train, + # past_covariates=self._past_cov) # Darts will automatically ignore the parts of past_covariates that go beyond the training period + # self._models[step] = model + # else: + # with ProcessPoolExecutor() as executor: + # futures = { + # executor.submit(self._fit_by_step, step): step for step in self._steps + # } + # for future in tqdm.tqdm( + # futures.keys(), desc="Fitting models for steps", total=len(futures) + # ): + # step = futures[future] + # models[step] = future.result() + # self._models = models # Use local variable to avoid concurrent execution issues + # self.is_fitted_ = True - return preds + # def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: + # check_is_fitted(self, "is_fitted_") + + # if run_type != "forecasting": + + # if eval_type == "standard": + # total_sequence_number = ( + # ForecastingModelManager._resolve_evaluation_sequence_number(eval_type) + # ) + + # if self.get_device_params().get("device") == "cuda": + # preds = [] + # for sequence_number in tqdm.tqdm( + # range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)), + # desc="Predicting for sequence number", + # ): + # pred_by_step = [ + # self._predict_by_step(self._models[step], step, sequence_number) + # for step in self._steps + # ] + # pred = pd.concat(pred_by_step, axis=0) + # preds.append(pred) + # else: + # preds = [None] * total_sequence_number + # with ProcessPoolExecutor() as executor: + # futures = { + # executor.submit( + # self._predict_by_sequence, sequence_number + # ): sequence_number + # for sequence_number in range(total_sequence_number) + # } + + # for future in tqdm.tqdm( + # as_completed(futures.keys()), + # desc="Predicting for sequence number", + # total=len(futures), + # ): + # sequence_number = futures[future] + # preds[sequence_number] = future.result() + # else: + # raise ValueError( + # f"{eval_type} is not supported now. Please use 'standard' evaluation type." + # ) + + # else: + + # if self.get_device_params().get("device") == "cuda": + # preds = [] + # for step in tqdm.tqdm(self._steps, desc="Predicting for steps"): + # preds.append(self._predict_by_step(step, 0)) + # preds = pd.concat(preds, axis=0).sort_index() + + # else: + # with ProcessPoolExecutor() as executor: + # futures = { + # step: executor.submit( + # self._predict_by_step, step, 0 + # ) + # for step in self._steps + # } + # preds_by_step = [ + # future.result() + # for future in tqdm.tqdm( + # as_completed(futures.values()), + # desc="Predicting outcomes", + # total=len(futures), + # ) + # ] + + # preds = pd.concat(preds_by_step, axis=0).sort_index() + + # return preds def save(self, path: str): try: From 1bfae830fbeb5bc0c77b9930ffb923a051e4d92d Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Thu, 13 Nov 2025 16:18:44 +0100 Subject: [PATCH 03/14] comment cuda for lightgbm --- views_stepshifter/models/hurdle_model.py | 1 + views_stepshifter/models/stepshifter.py | 12 +++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/views_stepshifter/models/hurdle_model.py b/views_stepshifter/models/hurdle_model.py index e358f74..502b2fd 100644 --- a/views_stepshifter/models/hurdle_model.py +++ b/views_stepshifter/models/hurdle_model.py @@ -71,6 +71,7 @@ def _resolve_clf_model(self, func_name: str): ) def _fit_by_step(self, step): + logger.info(f"[Step {step}] Starting model fitting...") # Fit binary-like stage using a classification model binary_model = self._clf(lags_past_covariates=[-step], **self._clf_params) binary_model.fit(self._target_binary, past_covariates=self._past_cov) diff --git a/views_stepshifter/models/stepshifter.py b/views_stepshifter/models/stepshifter.py index 67afe8b..eb43c40 100644 --- a/views_stepshifter/models/stepshifter.py +++ b/views_stepshifter/models/stepshifter.py @@ -107,10 +107,11 @@ def _resolve_reg_model(self, func_name: str): case "LGBMRegressor": from darts.models import LightGBMModel - if self.get_device_params().get("device") == "cuda": - logger.info("\033[92mUsing CUDA for LGBMRegressor\033[0m") - cuda_params = {"device": "cuda"} - return partial(LightGBMModel, **cuda_params) + # TODO: Add CUDA support for LightGBMRegressor + # if self.get_device_params().get("device") == "cuda": + # logger.info("\033[92mUsing CUDA for LGBMRegressor\033[0m") + # cuda_params = {"device": "cuda"} + # return partial(LightGBMModel, **cuda_params) return LightGBMModel case "RandomForestRegressor": from darts.models import RandomForest @@ -182,6 +183,7 @@ def _prepare_time_series(self, df: pd.DataFrame): ] def _fit_by_step(self, step): + logger.info(f"[Step {step}] Starting model fitting...") model = self._reg(lags_past_covariates=[-step], **self._reg_params) model.fit(self._target_train, past_covariates=self._past_cov) return model @@ -190,7 +192,7 @@ def _predict(self, model, step: int, sequence_number: int=0): """ Keep predictions with last-month-with-data, i.e., diagonal prediction """ - # logger.info(f"Starting prediction for step: {step}") + logger.info(f"[Step {step}] Starting prediction...") target = [ series.slice(self._train_start, self._train_end + 1 + sequence_number)[ self._targets From 8310a5e77c3316871f7676da22e906b92ca0d64c Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Fri, 14 Nov 2025 11:42:36 +0100 Subject: [PATCH 04/14] Revert "comment cuda for lightgbm" This reverts commit 1bfae830fbeb5bc0c77b9930ffb923a051e4d92d. --- views_stepshifter/models/hurdle_model.py | 1 - views_stepshifter/models/stepshifter.py | 12 +++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/views_stepshifter/models/hurdle_model.py b/views_stepshifter/models/hurdle_model.py index 502b2fd..e358f74 100644 --- a/views_stepshifter/models/hurdle_model.py +++ b/views_stepshifter/models/hurdle_model.py @@ -71,7 +71,6 @@ def _resolve_clf_model(self, func_name: str): ) def _fit_by_step(self, step): - logger.info(f"[Step {step}] Starting model fitting...") # Fit binary-like stage using a classification model binary_model = self._clf(lags_past_covariates=[-step], **self._clf_params) binary_model.fit(self._target_binary, past_covariates=self._past_cov) diff --git a/views_stepshifter/models/stepshifter.py b/views_stepshifter/models/stepshifter.py index eb43c40..67afe8b 100644 --- a/views_stepshifter/models/stepshifter.py +++ b/views_stepshifter/models/stepshifter.py @@ -107,11 +107,10 @@ def _resolve_reg_model(self, func_name: str): case "LGBMRegressor": from darts.models import LightGBMModel - # TODO: Add CUDA support for LightGBMRegressor - # if self.get_device_params().get("device") == "cuda": - # logger.info("\033[92mUsing CUDA for LGBMRegressor\033[0m") - # cuda_params = {"device": "cuda"} - # return partial(LightGBMModel, **cuda_params) + if self.get_device_params().get("device") == "cuda": + logger.info("\033[92mUsing CUDA for LGBMRegressor\033[0m") + cuda_params = {"device": "cuda"} + return partial(LightGBMModel, **cuda_params) return LightGBMModel case "RandomForestRegressor": from darts.models import RandomForest @@ -183,7 +182,6 @@ def _prepare_time_series(self, df: pd.DataFrame): ] def _fit_by_step(self, step): - logger.info(f"[Step {step}] Starting model fitting...") model = self._reg(lags_past_covariates=[-step], **self._reg_params) model.fit(self._target_train, past_covariates=self._past_cov) return model @@ -192,7 +190,7 @@ def _predict(self, model, step: int, sequence_number: int=0): """ Keep predictions with last-month-with-data, i.e., diagonal prediction """ - logger.info(f"[Step {step}] Starting prediction...") + # logger.info(f"Starting prediction for step: {step}") target = [ series.slice(self._train_start, self._train_end + 1 + sequence_number)[ self._targets From 036613fe7fe0f91a5498318e854cdf4b8a7b9151 Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Fri, 14 Nov 2025 11:42:44 +0100 Subject: [PATCH 05/14] Revert "add threadpool" This reverts commit eb731e86f964499a877c9e2f7b09287a5ce9900d. --- views_stepshifter/models/hurdle_model.py | 306 +++++++++++------------ views_stepshifter/models/stepshifter.py | 289 ++++++++------------- 2 files changed, 247 insertions(+), 348 deletions(-) diff --git a/views_stepshifter/models/hurdle_model.py b/views_stepshifter/models/hurdle_model.py index e358f74..bf60c40 100644 --- a/views_stepshifter/models/hurdle_model.py +++ b/views_stepshifter/models/hurdle_model.py @@ -70,46 +70,53 @@ def _resolve_clf_model(self, func_name: str): f"Change the model in the config file." ) - def _fit_by_step(self, step): + def _fit_by_step(self, step, target_binary, target_pos, past_cov_pos): # Fit binary-like stage using a classification model binary_model = self._clf(lags_past_covariates=[-step], **self._clf_params) - binary_model.fit(self._target_binary, past_covariates=self._past_cov) + binary_model.fit(target_binary, past_covariates=self._past_cov) # Fit positive stage using the regression model positive_model = self._reg(lags_past_covariates=[-step], **self._reg_params) - positive_model.fit(self._target_pos, past_covariates=self._past_cov_pos) + positive_model.fit(target_pos, past_covariates=past_cov_pos) return (binary_model, positive_model) - def _predict_by_step(self, step: int, sequence_number: int=0): - # OVERRIDE: Help function for parallel execution - binary_model, positive_model = self._models[step] - pred_binary = self._predict(binary_model, step, sequence_number) - pred_positive = self._predict(positive_model, step, sequence_number) - return pred_binary.sort_index() * pred_positive.sort_index() - def _predict_by_sequence(self, sequence_number): - pred_by_step = [] + pred_by_step_binary = [] + pred_by_step_positive = [] + for step in self._steps: - pred = self._predict_by_step(step, sequence_number) - pred_by_step.append(pred) - return pd.concat(pred_by_step, axis=0).sort_index() + # Predict for binary model + pred_binary = self._predict_by_step(self._models[step][0], step, sequence_number) + pred_by_step_binary.append(pred_binary) + + # Predict for positive model + pred_positive = self._predict_by_step(self._models[step][1], step, sequence_number) + pred_by_step_positive.append(pred_positive) + + final_pred = ( + pd.concat(pred_by_step_binary, axis=0).sort_index() * + pd.concat(pred_by_step_positive, axis=0).sort_index() + ) + return final_pred + @views_validate def fit(self, df: pd.DataFrame): df = self._process_data(df) self._prepare_time_series(df) self._clf = self._resolve_clf_model(self._config["model_clf"]) self._reg = self._resolve_reg_model(self._config["model_reg"]) + # Binary outcome (event/no-event) # According to the DARTS doc, if timeseries uses a numeric type different from np.float32 or np.float64, not all functionalities may work properly. # So use astype(float) instead of astype(int) (we should have binary outputs 0,1 though) - self._target_binary = [ + target_binary = [ s.map(lambda x: (x > 0).astype(float)) for s in self._target_train ] # Positive outcome (for cases where target > 0) - self._target_pos, self._past_cov_pos = zip( + target_pos, past_cov_pos = zip( *[ (t, p) for t, p in zip(self._target_train, self._past_cov) @@ -117,159 +124,128 @@ def fit(self, df: pd.DataFrame): ] ) - model_list = self._execute_parallel( - func=self._fit_by_step, - items=self._steps, - desc="Fitting models for steps" - ) - self._models = {step: model for step, model in zip(self._steps, model_list)} + if self.get_device_params().get("device") == "cuda": + for step in tqdm.tqdm(self._steps, desc="Fitting model for step", leave=True): + # Fit binary-like stage using a classification model, but the target is binary (0 or 1) + binary_model = self._clf(lags_past_covariates=[-step], **self._clf_params) + binary_model.fit(target_binary, past_covariates=self._past_cov) + + # Fit positive stage using the regression model + positive_model = self._reg(lags_past_covariates=[-step], **self._reg_params) + positive_model.fit(target_pos, past_covariates=past_cov_pos) + self._models[step] = (binary_model, positive_model) + else: + models = {} + with ProcessPoolExecutor() as executor: + futures = { + executor.submit(self._fit_by_step, step, target_binary, target_pos, past_cov_pos): step + for step in self._steps + } + for future in tqdm.tqdm(as_completed(futures.keys()), desc="Fitting models for steps", total=len(futures)): + step = futures[future] + models[step] = future.result() + self._models = models self.is_fitted_ = True - # @views_validate - # def fit(self, df: pd.DataFrame): - # df = self._process_data(df) - # self._prepare_time_series(df) - # self._clf = self._resolve_clf_model(self._config["model_clf"]) - # self._reg = self._resolve_reg_model(self._config["model_reg"]) - - # # Binary outcome (event/no-event) - # # According to the DARTS doc, if timeseries uses a numeric type different from np.float32 or np.float64, not all functionalities may work properly. - # # So use astype(float) instead of astype(int) (we should have binary outputs 0,1 though) - # target_binary = [ - # s.map(lambda x: (x > 0).astype(float)) for s in self._target_train - # ] - - # # Positive outcome (for cases where target > 0) - # target_pos, past_cov_pos = zip( - # *[ - # (t, p) - # for t, p in zip(self._target_train, self._past_cov) - # if (t.values() > 0).any() - # ] - # ) - - # if self.get_device_params().get("device") == "cuda": - # for step in tqdm.tqdm(self._steps, desc="Fitting model for step", leave=True): - # # Fit binary-like stage using a classification model, but the target is binary (0 or 1) - # binary_model = self._clf(lags_past_covariates=[-step], **self._clf_params) - # binary_model.fit(target_binary, past_covariates=self._past_cov) - - # # Fit positive stage using the regression model - # positive_model = self._reg(lags_past_covariates=[-step], **self._reg_params) - # positive_model.fit(target_pos, past_covariates=past_cov_pos) - # self._models[step] = (binary_model, positive_model) - # else: - # models = {} - # with ProcessPoolExecutor() as executor: - # futures = { - # executor.submit(self._fit_by_step, step, target_binary, target_pos, past_cov_pos): step - # for step in self._steps - # } - # for future in tqdm.tqdm(as_completed(futures.keys()), desc="Fitting models for steps", total=len(futures)): - # step = futures[future] - # models[step] = future.result() - # self._models = models - # self.is_fitted_ = True + def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: + check_is_fitted(self, "is_fitted_") - # def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: - # check_is_fitted(self, "is_fitted_") + if run_type != "forecasting": - # if run_type != "forecasting": - - # if eval_type == "standard": - # total_sequence_number = ( - # ForecastingModelManager._resolve_evaluation_sequence_number(eval_type) - # ) - # if self.get_device_params().get("device") == "cuda": - # preds = [] - # for sequence_number in tqdm.tqdm( - # range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)), - # desc="Predicting for sequence number", - # ): - # pred_by_step_binary = [ - # self._predict_by_step( - # self._models[step][0], step, sequence_number - # ) - # for step in self._steps - # ] - # pred_by_step_positive = [ - # self._predict_by_step( - # self._models[step][1], step, sequence_number - # ) - # for step in self._steps - # ] - # pred = pd.concat(pred_by_step_binary, axis=0) * pd.concat(pred_by_step_positive, axis=0) - # preds.append(pred) - - # else: - # preds = [None] * total_sequence_number - # with ProcessPoolExecutor() as executor: - # futures = { - # executor.submit(self._predict_by_sequence, sequence_number): sequence_number - # for sequence_number in range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)) - # } - # for future in tqdm.tqdm( - # as_completed(futures.keys()), - # desc="Predicting for sequence number", - # total=len(futures), - # ): - # sequence_number = futures[future] - # preds[sequence_number] = future.result() - # else: - # raise ValueError( - # f"{eval_type} is not supported now. Please use 'standard' evaluation type." - # ) + if eval_type == "standard": + total_sequence_number = ( + ForecastingModelManager._resolve_evaluation_sequence_number(eval_type) + ) + if self.get_device_params().get("device") == "cuda": + preds = [] + for sequence_number in tqdm.tqdm( + range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)), + desc="Predicting for sequence number", + ): + pred_by_step_binary = [ + self._predict_by_step( + self._models[step][0], step, sequence_number + ) + for step in self._steps + ] + pred_by_step_positive = [ + self._predict_by_step( + self._models[step][1], step, sequence_number + ) + for step in self._steps + ] + pred = pd.concat(pred_by_step_binary, axis=0) * pd.concat(pred_by_step_positive, axis=0) + preds.append(pred) + + else: + preds = [None] * total_sequence_number + with ProcessPoolExecutor() as executor: + futures = { + executor.submit(self._predict_by_sequence, sequence_number): sequence_number + for sequence_number in range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)) + } + for future in tqdm.tqdm( + as_completed(futures.keys()), + desc="Predicting for sequence number", + total=len(futures), + ): + sequence_number = futures[future] + preds[sequence_number] = future.result() + else: + raise ValueError( + f"{eval_type} is not supported now. Please use 'standard' evaluation type." + ) - # else: - # if self.get_device_params().get("device") == "cuda": - # pred_by_step_binary = [] - # pred_by_step_positive = [] - # for step in tqdm.tqdm(self._steps, desc="Predicting for step", total=len(self._steps)): - # pred_by_step_binary.append( - # self._predict_by_step(self._models[step][0], step, 0) - # ) - # pred_by_step_positive.append( - # self._predict_by_step(self._models[step][1], step, 0) - # ) + else: + if self.get_device_params().get("device") == "cuda": + pred_by_step_binary = [] + pred_by_step_positive = [] + for step in tqdm.tqdm(self._steps, desc="Predicting for step", total=len(self._steps)): + pred_by_step_binary.append( + self._predict_by_step(self._models[step][0], step, 0) + ) + pred_by_step_positive.append( + self._predict_by_step(self._models[step][1], step, 0) + ) - # preds = pd.concat(pred_by_step_binary, axis=0) * pd.concat( - # pred_by_step_positive, axis=0 - # ) + preds = pd.concat(pred_by_step_binary, axis=0) * pd.concat( + pred_by_step_positive, axis=0 + ) - # else: - # with ProcessPoolExecutor() as executor: - # futures_binary = { - # step: executor.submit( - # self._predict_by_step, self._models[step][0], step, 0 - # ) - # for step in self._steps - # } - # futures_positive = { - # step: executor.submit( - # self._predict_by_step, self._models[step][1], step, 0 - # ) - # for step in self._steps - # } - - # pred_by_step_binary = [ - # future.result() - # for future in tqdm.tqdm( - # as_completed(futures_binary.values()), - # desc="Predicting binary outcomes", - # total=len(futures_binary), - # ) - # ] - # pred_by_step_positive = [ - # future.result() - # for future in tqdm.tqdm( - # as_completed(futures_positive.values()), - # desc="Predicting positive outcomes", - # total=len(futures_positive), - # ) - # ] - - # preds = ( - # pd.concat(pred_by_step_binary, axis=0).sort_index() - # * pd.concat(pred_by_step_positive, axis=0).sort_index() - # ) - # return preds + else: + with ProcessPoolExecutor() as executor: + futures_binary = { + step: executor.submit( + self._predict_by_step, self._models[step][0], step, 0 + ) + for step in self._steps + } + futures_positive = { + step: executor.submit( + self._predict_by_step, self._models[step][1], step, 0 + ) + for step in self._steps + } + + pred_by_step_binary = [ + future.result() + for future in tqdm.tqdm( + as_completed(futures_binary.values()), + desc="Predicting binary outcomes", + total=len(futures_binary), + ) + ] + pred_by_step_positive = [ + future.result() + for future in tqdm.tqdm( + as_completed(futures_positive.values()), + desc="Predicting positive outcomes", + total=len(futures_positive), + ) + ] + + preds = ( + pd.concat(pred_by_step_binary, axis=0).sort_index() + * pd.concat(pred_by_step_positive, axis=0).sort_index() + ) + return preds diff --git a/views_stepshifter/models/stepshifter.py b/views_stepshifter/models/stepshifter.py index 67afe8b..264d97b 100644 --- a/views_stepshifter/models/stepshifter.py +++ b/views_stepshifter/models/stepshifter.py @@ -4,11 +4,11 @@ import logging from darts import TimeSeries from sklearn.utils.validation import check_is_fitted -from typing import List, Dict, Callable +from typing import List, Dict from views_stepshifter.models.validation import views_validate from views_pipeline_core.managers.model import ModelManager, ForecastingModelManager import tqdm -from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor +from concurrent.futures import ProcessPoolExecutor, as_completed import torch from functools import partial @@ -41,46 +41,6 @@ def get_device_params(): else: return {} - def _execute_parallel(self, func: Callable, items: List, desc: str, max_workers: int=None): - """ - Executes a function in parallel, automatically choosing - ThreadPoolExecutor for CUDA or ProcessPoolExecutor for CPU. - - Args: - func: The function to execute. - items: A list of items to submit to the function. - desc: The description for the tqdm progress bar. - max_workers: The max_workers for the ThreadPoolExecutor (for CUDA). - ProcessPoolExecutor will use its default. - Returns: - A list of results, in the order the items were submitted. - """ - if self.get_device_params().get("device") == "cuda": - ExecutorClass = ThreadPoolExecutor - mw = max_workers if max_workers is not None else len(items) - executor_kwargs = {"max_workers": mw} - desc_prefix = "GPU" - else: - ExecutorClass = ProcessPoolExecutor - executor_kwargs = {} - desc_prefix = "CPU" - - # Store results by their original index to return them in order - futures = {} - results = [None] * len(items) - - with ExecutorClass(**executor_kwargs) as executor: - for index, item in enumerate(items): - future = executor.submit(func, item) - futures[future] = index - - full_desc = f"{desc_prefix}: {desc}" - for future in tqdm.tqdm(as_completed(futures.keys()), desc=full_desc, total=len(futures)): - index = futures[future] - results[index] = future.result() - - return results - def _resolve_reg_model(self, func_name: str): """ Lookup table for supported regression models @@ -90,7 +50,6 @@ def _resolve_reg_model(self, func_name: str): match func_name: case "XGBRFRegressor": from views_stepshifter.models.darts_model import XGBRFModel - if self.get_device_params().get("device") == "cuda": logger.info("\033[92mUsing CUDA for XGBRFRegressor\033[0m") cuda_params = {"tree_method": "hist", "device": "cuda"} @@ -98,7 +57,6 @@ def _resolve_reg_model(self, func_name: str): return XGBRFModel case "XGBRegressor": from darts.models import XGBModel - if self.get_device_params().get("device") == "cuda": logger.info("\033[92mUsing CUDA for XGBRegressor\033[0m") cuda_params = {"tree_method": "hist", "device": "cuda"} @@ -106,11 +64,10 @@ def _resolve_reg_model(self, func_name: str): return XGBModel case "LGBMRegressor": from darts.models import LightGBMModel - - if self.get_device_params().get("device") == "cuda": - logger.info("\033[92mUsing CUDA for LGBMRegressor\033[0m") - cuda_params = {"device": "cuda"} - return partial(LightGBMModel, **cuda_params) + # if self.get_device_params().get("device") == "cuda": + # logger.info("\033[92mUsing CUDA for LGBMRegressor\033[0m") + # cuda_params = {"device": "cuda"} + # return partial(LightGBMModel, **cuda_params) return LightGBMModel case "RandomForestRegressor": from darts.models import RandomForest @@ -186,7 +143,7 @@ def _fit_by_step(self, step): model.fit(self._target_train, past_covariates=self._past_cov) return model - def _predict(self, model, step: int, sequence_number: int=0): + def _predict_by_step(self, model, step: int, sequence_number: int): """ Keep predictions with last-month-with-data, i.e., diagonal prediction """ @@ -224,162 +181,128 @@ def _predict(self, model, step: int, sequence_number: int=0): ) return df_preds.sort_index() - - def _predict_by_step(self, step: int, sequence_number: int=0): - # Help function for parallel execution - return self._predict(self._models[step], step, sequence_number) def _predict_by_sequence(self, sequence_number): pred_by_step = [] for step in self._steps: - pred = self._predict_by_step(step, sequence_number) + pred = self._predict_by_step(self._models[step], step, sequence_number) pred_by_step.append(pred) return pd.concat(pred_by_step, axis=0).sort_index() - + + # @views_validate + # def fit(self, df: pd.DataFrame): + # df = self._process_data(df) + # self._prepare_time_series(df) + # self._reg = self._resolve_reg_model(self._config["model_reg"]) + # for step in tqdm.tqdm( + # self._steps, desc="Fitting model for step", leave=True + # ): + # model = self._reg(lags_past_covariates=[-step], **self._reg_params) + # model.fit(self._target_train, + # past_covariates=self._past_cov) # Darts will automatically ignore the parts of past_covariates that go beyond the training period + # self._models[step] = model + # self.is_fitted_ = True + @views_validate def fit(self, df: pd.DataFrame): df = self._process_data(df) self._prepare_time_series(df) self._reg = self._resolve_reg_model(self._config["model_reg"]) - model_list = self._execute_parallel( - func=self._fit_by_step, - items=self._steps, - desc="Fitting models for steps" - ) - - self._models = {step: model for step, model in zip(self._steps, model_list)} + models = {} + if self.get_device_params().get("device") == "cuda": + for step in tqdm.tqdm( + self._steps, desc="Fitting model for step", leave=True + ): + model = self._reg(lags_past_covariates=[-step], **self._reg_params) + model.fit(self._target_train, + past_covariates=self._past_cov) # Darts will automatically ignore the parts of past_covariates that go beyond the training period + self._models[step] = model + else: + with ProcessPoolExecutor() as executor: + futures = { + executor.submit(self._fit_by_step, step): step for step in self._steps + } + for future in tqdm.tqdm( + futures.keys(), desc="Fitting models for steps", total=len(futures) + ): + step = futures[future] + models[step] = future.result() + self._models = models # Use local variable to avoid concurrent execution issues self.is_fitted_ = True def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: check_is_fitted(self, "is_fitted_") if run_type != "forecasting": - total_sequence_number = ( - ForecastingModelManager._resolve_evaluation_sequence_number( - eval_type - ) - ) - - sequence_numbers = list(range(total_sequence_number)) - preds = self._execute_parallel( - func=self._predict_by_sequence, - items=sequence_numbers, - desc="Predicting for sequence number" - ) - - else: - preds_by_step = self._execute_parallel( - func=self._predict_by_step, - items=self._steps, - desc="Predicting outcomes" - ) - - preds = pd.concat(preds_by_step, axis=0).sort_index() - return preds - + if eval_type == "standard": + total_sequence_number = ( + ForecastingModelManager._resolve_evaluation_sequence_number(eval_type) + ) - # @views_validate - # def fit(self, df: pd.DataFrame): - # df = self._process_data(df) - # self._prepare_time_series(df) - # self._reg = self._resolve_reg_model(self._config["model_reg"]) + if self.get_device_params().get("device") == "cuda": + preds = [] + for sequence_number in tqdm.tqdm( + range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)), + desc="Predicting for sequence number", + ): + pred_by_step = [ + self._predict_by_step(self._models[step], step, sequence_number) + for step in self._steps + ] + pred = pd.concat(pred_by_step, axis=0) + preds.append(pred) + else: + preds = [None] * total_sequence_number + with ProcessPoolExecutor() as executor: + futures = { + executor.submit( + self._predict_by_sequence, sequence_number + ): sequence_number + for sequence_number in range(total_sequence_number) + } + + for future in tqdm.tqdm( + as_completed(futures.keys()), + desc="Predicting for sequence number", + total=len(futures), + ): + sequence_number = futures[future] + preds[sequence_number] = future.result() + else: + raise ValueError( + f"{eval_type} is not supported now. Please use 'standard' evaluation type." + ) - # models = {} - # if self.get_device_params().get("device") == "cuda": - # for step in tqdm.tqdm( - # self._steps, desc="Fitting model for step", leave=True - # ): - # model = self._reg(lags_past_covariates=[-step], **self._reg_params) - # model.fit(self._target_train, - # past_covariates=self._past_cov) # Darts will automatically ignore the parts of past_covariates that go beyond the training period - # self._models[step] = model - # else: - # with ProcessPoolExecutor() as executor: - # futures = { - # executor.submit(self._fit_by_step, step): step for step in self._steps - # } - # for future in tqdm.tqdm( - # futures.keys(), desc="Fitting models for steps", total=len(futures) - # ): - # step = futures[future] - # models[step] = future.result() - # self._models = models # Use local variable to avoid concurrent execution issues - # self.is_fitted_ = True + else: - # def predict(self, run_type: str, eval_type: str = "standard") -> pd.DataFrame: - # check_is_fitted(self, "is_fitted_") - - # if run_type != "forecasting": - - # if eval_type == "standard": - # total_sequence_number = ( - # ForecastingModelManager._resolve_evaluation_sequence_number(eval_type) - # ) - - # if self.get_device_params().get("device") == "cuda": - # preds = [] - # for sequence_number in tqdm.tqdm( - # range(ForecastingModelManager._resolve_evaluation_sequence_number(eval_type)), - # desc="Predicting for sequence number", - # ): - # pred_by_step = [ - # self._predict_by_step(self._models[step], step, sequence_number) - # for step in self._steps - # ] - # pred = pd.concat(pred_by_step, axis=0) - # preds.append(pred) - # else: - # preds = [None] * total_sequence_number - # with ProcessPoolExecutor() as executor: - # futures = { - # executor.submit( - # self._predict_by_sequence, sequence_number - # ): sequence_number - # for sequence_number in range(total_sequence_number) - # } - - # for future in tqdm.tqdm( - # as_completed(futures.keys()), - # desc="Predicting for sequence number", - # total=len(futures), - # ): - # sequence_number = futures[future] - # preds[sequence_number] = future.result() - # else: - # raise ValueError( - # f"{eval_type} is not supported now. Please use 'standard' evaluation type." - # ) - - # else: - - # if self.get_device_params().get("device") == "cuda": - # preds = [] - # for step in tqdm.tqdm(self._steps, desc="Predicting for steps"): - # preds.append(self._predict_by_step(step, 0)) - # preds = pd.concat(preds, axis=0).sort_index() + if self.get_device_params().get("device") == "cuda": + preds = [] + for step in tqdm.tqdm(self._steps, desc="Predicting for steps"): + preds.append(self._predict_by_step(self._models[step], step, 0)) + preds = pd.concat(preds, axis=0).sort_index() - # else: - # with ProcessPoolExecutor() as executor: - # futures = { - # step: executor.submit( - # self._predict_by_step, step, 0 - # ) - # for step in self._steps - # } - # preds_by_step = [ - # future.result() - # for future in tqdm.tqdm( - # as_completed(futures.values()), - # desc="Predicting outcomes", - # total=len(futures), - # ) - # ] - - # preds = pd.concat(preds_by_step, axis=0).sort_index() - - # return preds + else: + with ProcessPoolExecutor() as executor: + futures = { + step: executor.submit( + self._predict_by_step, self._models[step], step, 0 + ) + for step in self._steps + } + preds_by_step = [ + future.result() + for future in tqdm.tqdm( + as_completed(futures.values()), + desc="Predicting outcomes", + total=len(futures), + ) + ] + + preds = pd.concat(preds_by_step, axis=0).sort_index() + + return preds def save(self, path: str): try: From 9ff304e4094d46255b302dbe780d96bb252705b3 Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Mon, 17 Nov 2025 15:09:44 +0100 Subject: [PATCH 06/14] use classification models from darts --- views_stepshifter/models/__init__.py | 10 +- views_stepshifter/models/darts_model.py | 1237 +--------------------- views_stepshifter/models/hurdle_model.py | 26 +- views_stepshifter/models/stepshifter.py | 26 +- 4 files changed, 45 insertions(+), 1254 deletions(-) diff --git a/views_stepshifter/models/__init__.py b/views_stepshifter/models/__init__.py index 2680e7f..4006582 100644 --- a/views_stepshifter/models/__init__.py +++ b/views_stepshifter/models/__init__.py @@ -1,18 +1,10 @@ -from views_stepshifter.models.darts_model import ( - XGBClassifierModel, - XGBRFClassifierModel, - XGBRFModel, - LightGBMClassifierModel, - RandomForestClassifierModel) +from views_stepshifter.models.darts_model import XGBRFClassifierModel, XGBRFModel from views_stepshifter.models.hurdle_model import HurdleModel from views_stepshifter.models.stepshifter import StepshifterModel __all__ = [ - "XGBClassifierModel", "XGBRFClassifierModel", "XGBRFModel", - "LightGBMClassifierModel", - "RandomForestClassifierModel", "HurdleModel", "StepshifterModel" ] \ No newline at end of file diff --git a/views_stepshifter/models/darts_model.py b/views_stepshifter/models/darts_model.py index ffed8bd..365b6a3 100644 --- a/views_stepshifter/models/darts_model.py +++ b/views_stepshifter/models/darts_model.py @@ -1,1221 +1,30 @@ -from collections.abc import Sequence -from functools import partial -from typing import Optional, Union - -import numpy as np import xgboost as xgb -import lightgbm as lgb -from sklearn.ensemble import RandomForestClassifier - -from darts.logging import get_logger, raise_if_not -from darts.models.forecasting.regression_model import ( - FUTURE_LAGS_TYPE, - LAGS_TYPE, - RegressionModel, - _LikelihoodMixin, - RegressionModelWithCategoricalCovariates, -) -from darts.timeseries import TimeSeries - -logger = get_logger(__name__) - -# Check whether we are running xgboost >= 2.0.0 for quantile regression -tokens = xgb.__version__.split(".") -xgb_200_or_above = int(tokens[0]) >= 2 - -def xgb_quantile_loss(labels: np.ndarray, preds: np.ndarray, quantile: float): - """Custom loss function for XGBoost to compute quantile loss gradient. +from darts.models import XGBModel, XGBClassifierModel - Inspired from: https://gist.github.com/Nikolay-Lysenko/06769d701c1d9c9acb9a66f2f9d7a6c7 - - This computes the gradient of the pinball loss between predictions and target labels. +class XGBRFModel(XGBModel): """ - raise_if_not(0 <= quantile <= 1, "Quantile must be between 0 and 1.", logger) - - errors = preds - labels - left_mask = errors < 0 - right_mask = errors > 0 - - grad = -quantile * left_mask + (1 - quantile) * right_mask - hess = np.ones_like(preds) - - return grad, hess - -class XGBRFModel(RegressionModel, _LikelihoodMixin): - def __init__( - self, - lags: Optional[LAGS_TYPE] = None, - lags_past_covariates: Optional[LAGS_TYPE] = None, - lags_future_covariates: Optional[FUTURE_LAGS_TYPE] = None, - output_chunk_length: int = 1, - output_chunk_shift: int = 0, - add_encoders: Optional[dict] = None, - likelihood: Optional[str] = None, - quantiles: Optional[list[float]] = None, - random_state: Optional[int] = None, - multi_models: Optional[bool] = True, - use_static_covariates: bool = True, - **kwargs, - ): - """XGBoost Model - - Parameters - ---------- - lags - Lagged target `series` values used to predict the next time step/s. - If an integer, must be > 0. Uses the last `n=lags` past lags; e.g. `(-1, -2, ..., -lags)`, where `0` - corresponds the first predicted time step of each sample. If `output_chunk_shift > 0`, then - lag `-1` translates to `-1 - output_chunk_shift` steps before the first prediction step. - If a list of integers, each value must be < 0. Uses only the specified values as lags. - If a dictionary, the keys correspond to the `series` component names (of the first series when - using multiple series) and the values correspond to the component lags (integer or list of integers). The - key 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - lags_past_covariates - Lagged `past_covariates` values used to predict the next time step/s. - If an integer, must be > 0. Uses the last `n=lags_past_covariates` past lags; e.g. `(-1, -2, ..., -lags)`, - where `0` corresponds to the first predicted time step of each sample. If `output_chunk_shift > 0`, then - lag `-1` translates to `-1 - output_chunk_shift` steps before the first prediction step. - If a list of integers, each value must be < 0. Uses only the specified values as lags. - If a dictionary, the keys correspond to the `past_covariates` component names (of the first series when - using multiple series) and the values correspond to the component lags (integer or list of integers). The - key 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - lags_future_covariates - Lagged `future_covariates` values used to predict the next time step/s. The lags are always relative to the - first step in the output chunk, even when `output_chunk_shift > 0`. - If a tuple of `(past, future)`, both values must be > 0. Uses the last `n=past` past lags and `n=future` - future lags; e.g. `(-past, -(past - 1), ..., -1, 0, 1, .... future - 1)`, where `0` corresponds the first - predicted time step of each sample. If `output_chunk_shift > 0`, the position of negative lags differ from - those of `lags` and `lags_past_covariates`. In this case a future lag `-5` would point at the same - step as a target lag of `-5 + output_chunk_shift`. - If a list of integers, uses only the specified values as lags. - If a dictionary, the keys correspond to the `future_covariates` component names (of the first series when - using multiple series) and the values correspond to the component lags (tuple or list of integers). The key - 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - output_chunk_length - Number of time steps predicted at once (per chunk) by the internal model. It is not the same as forecast - horizon `n` used in `predict()`, which is the desired number of prediction points generated using a - one-shot- or autoregressive forecast. Setting `n <= output_chunk_length` prevents auto-regression. This is - useful when the covariates don't extend far enough into the future, or to prohibit the model from using - future values of past and / or future covariates for prediction (depending on the model's covariate - support). - output_chunk_shift - Optionally, the number of steps to shift the start of the output chunk into the future (relative to the - input chunk end). This will create a gap between the input (history of target and past covariates) and - output. If the model supports `future_covariates`, the `lags_future_covariates` are relative to the first - step in the shifted output chunk. Predictions will start `output_chunk_shift` steps after the end of the - target `series`. If `output_chunk_shift` is set, the model cannot generate autoregressive predictions - (`n > output_chunk_length`). - add_encoders - A large number of past and future covariates can be automatically generated with `add_encoders`. - This can be done by adding multiple pre-defined index encoders and/or custom user-made functions that - will be used as index encoders. Additionally, a transformer such as Darts' :class:`Scaler` can be added to - transform the generated covariates. This happens all under one hood and only needs to be specified at - model creation. - Read :meth:`SequentialEncoder ` to find out more about - ``add_encoders``. Default: ``None``. An example showing some of ``add_encoders`` features: - - .. highlight:: python - .. code-block:: python - - def encode_year(idx): - return (idx.year - 1950) / 50 - - add_encoders={ - 'cyclic': {'future': ['month']}, - 'datetime_attribute': {'future': ['hour', 'dayofweek']}, - 'position': {'past': ['relative'], 'future': ['relative']}, - 'custom': {'past': [encode_year]}, - 'transformer': Scaler(), - 'tz': 'CET' - } - .. - likelihood - Can be set to `poisson` or `quantile`. If set, the model will be probabilistic, allowing sampling at - prediction time. This will overwrite any `objective` parameter. - quantiles - Fit the model to these quantiles if the `likelihood` is set to `quantile`. - random_state - Control the randomness in the fitting procedure and for sampling. - Default: ``None``. - multi_models - If True, a separate model will be trained for each future lag to predict. If False, a single model - is trained to predict all the steps in 'output_chunk_length' (features lags are shifted back by - `output_chunk_length - n` for each step `n`). Default: True. - use_static_covariates - Whether the model should use static covariate information in case the input `series` passed to ``fit()`` - contain static covariates. If ``True``, and static covariates are available at fitting time, will enforce - that all target `series` have the same static covariate dimensionality in ``fit()`` and ``predict()``. - **kwargs - Additional keyword arguments passed to `xgb.XGBRegressor`. - - Examples - -------- - Deterministic forecasting, using past/future covariates (optional) - - >>> from darts.datasets import WeatherDataset - >>> from darts.models import XGBModel - >>> series = WeatherDataset().load() - >>> # predicting atmospheric pressure - >>> target = series['p (mbar)'][:100] - >>> # optionally, use past observed rainfall (pretending to be unknown beyond index 100) - >>> past_cov = series['rain (mm)'][:100] - >>> # optionally, use future temperatures (pretending this component is a forecast) - >>> future_cov = series['T (degC)'][:106] - >>> # predict 6 pressure values using the 12 past values of pressure and rainfall, as well as the 6 temperature - >>> # values corresponding to the forecasted period - >>> model = XGBModel( - >>> lags=12, - >>> lags_past_covariates=12, - >>> lags_future_covariates=[0,1,2,3,4,5], - >>> output_chunk_length=6, - >>> ) - >>> model.fit(target, past_covariates=past_cov, future_covariates=future_cov) - >>> pred = model.predict(6) - >>> pred.values() - array([[1005.9185 ], - [1005.8315 ], - [1005.7878 ], - [1005.72626], - [1005.7475 ], - [1005.76074]]) - """ - kwargs["random_state"] = random_state # seed for tree learner - self.kwargs = kwargs - self._median_idx = None - self._model_container = None - self.quantiles = None - self._likelihood = likelihood - self._rng = None - - # parse likelihood - available_likelihoods = ["poisson", "quantile"] # to be extended - if likelihood is not None: - self._check_likelihood(likelihood, available_likelihoods) - if likelihood in {"poisson"}: - self.kwargs["objective"] = f"count:{likelihood}" - elif likelihood == "quantile": - if xgb_200_or_above: - # leverage built-in Quantile Regression - self.kwargs["objective"] = "reg:quantileerror" - self.quantiles, self._median_idx = self._prepare_quantiles(quantiles) - self._model_container = self._get_model_container() - - self._rng = np.random.default_rng(seed=random_state) # seed for sampling - - super().__init__( - lags=lags, - lags_past_covariates=lags_past_covariates, - lags_future_covariates=lags_future_covariates, - output_chunk_length=output_chunk_length, - output_chunk_shift=output_chunk_shift, - add_encoders=add_encoders, - multi_models=multi_models, - model=xgb.XGBRFRegressor(**self.kwargs), - use_static_covariates=use_static_covariates, - ) - - def fit( - self, - series: Union[TimeSeries, Sequence[TimeSeries]], - past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - val_series: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - val_past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - val_future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - max_samples_per_ts: Optional[int] = None, - n_jobs_multioutput_wrapper: Optional[int] = None, - sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, - val_sample_weight: Optional[ - Union[TimeSeries, Sequence[TimeSeries], str] - ] = None, - **kwargs, - ): - """ - Fits/trains the model using the provided list of features time series and the target time series. - - Parameters - ---------- - series - TimeSeries or Sequence[TimeSeries] object containing the target values. - past_covariates - Optionally, a series or sequence of series specifying past-observed covariates - future_covariates - Optionally, a series or sequence of series specifying future-known covariates - val_series - TimeSeries or Sequence[TimeSeries] object containing the target values for evaluation dataset - val_past_covariates - Optionally, a series or sequence of series specifying past-observed covariates for evaluation dataset - val_future_covariates : - Optionally, a series or sequence of series specifying future-known covariates for evaluation dataset - max_samples_per_ts - This is an integer upper bound on the number of tuples that can be produced - per time series. It can be used in order to have an upper bound on the total size of the dataset and - ensure proper sampling. If `None`, it will read all of the individual time series in advance (at dataset - creation) to know their sizes, which might be expensive on big datasets. - If some series turn out to have a length that would allow more than `max_samples_per_ts`, only the - most recent `max_samples_per_ts` samples will be considered. - n_jobs_multioutput_wrapper - Number of jobs of the MultiOutputRegressor wrapper to run in parallel. Only used if the model doesn't - support multi-output regression natively. - sample_weight - Optionally, some sample weights to apply to the target `series` labels. They are applied per observation, - per label (each step in `output_chunk_length`), and per component. - If a series or sequence of series, then those weights are used. If the weight series only have a single - component / column, then the weights are applied globally to all components in `series`. Otherwise, for - component-specific weights, the number of components must match those of `series`. - If a string, then the weights are generated using built-in weighting functions. The available options are - `"linear"` or `"exponential"` decay - the further in the past, the lower the weight. The weights are - computed globally based on the length of the longest series in `series`. Then for each series, the weights - are extracted from the end of the global weights. This gives a common time weighting across all series. - val_sample_weight - Same as for `sample_weight` but for the evaluation dataset. - **kwargs - Additional kwargs passed to `xgb.XGBRegressor.fit()` - """ - # TODO: XGBRegressor supports multi quantile reqression which we could leverage in the future - # see https://xgboost.readthedocs.io/en/latest/python/examples/quantile_regression.html - - super().fit( - series=series, - past_covariates=past_covariates, - future_covariates=future_covariates, - val_series=val_series, - val_past_covariates=val_past_covariates, - val_future_covariates=val_future_covariates, - max_samples_per_ts=max_samples_per_ts, - n_jobs_multioutput_wrapper=n_jobs_multioutput_wrapper, - sample_weight=sample_weight, - val_sample_weight=val_sample_weight, - **kwargs, - ) - return self + A Darts wrapper for the XGBoost Random Forest Regressor. + This class inherits all functionality from darts.models.XGBModel + and simply replaces the underlying model with + `xgb.XGBRFRegressor`. + """ -class XGBRFClassifierModel(RegressionModel, _LikelihoodMixin): - def __init__( - self, - lags: Optional[LAGS_TYPE] = None, - lags_past_covariates: Optional[LAGS_TYPE] = None, - lags_future_covariates: Optional[FUTURE_LAGS_TYPE] = None, - output_chunk_length: int = 1, - output_chunk_shift: int = 0, - add_encoders: Optional[dict] = None, - likelihood: Optional[str] = None, - quantiles: Optional[list[float]] = None, - random_state: Optional[int] = None, - multi_models: Optional[bool] = True, - use_static_covariates: bool = True, - **kwargs, - ): - """XGBoost Model - - Parameters - ---------- - lags - Lagged target `series` values used to predict the next time step/s. - If an integer, must be > 0. Uses the last `n=lags` past lags; e.g. `(-1, -2, ..., -lags)`, where `0` - corresponds the first predicted time step of each sample. If `output_chunk_shift > 0`, then - lag `-1` translates to `-1 - output_chunk_shift` steps before the first prediction step. - If a list of integers, each value must be < 0. Uses only the specified values as lags. - If a dictionary, the keys correspond to the `series` component names (of the first series when - using multiple series) and the values correspond to the component lags (integer or list of integers). The - key 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - lags_past_covariates - Lagged `past_covariates` values used to predict the next time step/s. - If an integer, must be > 0. Uses the last `n=lags_past_covariates` past lags; e.g. `(-1, -2, ..., -lags)`, - where `0` corresponds to the first predicted time step of each sample. If `output_chunk_shift > 0`, then - lag `-1` translates to `-1 - output_chunk_shift` steps before the first prediction step. - If a list of integers, each value must be < 0. Uses only the specified values as lags. - If a dictionary, the keys correspond to the `past_covariates` component names (of the first series when - using multiple series) and the values correspond to the component lags (integer or list of integers). The - key 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - lags_future_covariates - Lagged `future_covariates` values used to predict the next time step/s. The lags are always relative to the - first step in the output chunk, even when `output_chunk_shift > 0`. - If a tuple of `(past, future)`, both values must be > 0. Uses the last `n=past` past lags and `n=future` - future lags; e.g. `(-past, -(past - 1), ..., -1, 0, 1, .... future - 1)`, where `0` corresponds the first - predicted time step of each sample. If `output_chunk_shift > 0`, the position of negative lags differ from - those of `lags` and `lags_past_covariates`. In this case a future lag `-5` would point at the same - step as a target lag of `-5 + output_chunk_shift`. - If a list of integers, uses only the specified values as lags. - If a dictionary, the keys correspond to the `future_covariates` component names (of the first series when - using multiple series) and the values correspond to the component lags (tuple or list of integers). The key - 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - output_chunk_length - Number of time steps predicted at once (per chunk) by the internal model. It is not the same as forecast - horizon `n` used in `predict()`, which is the desired number of prediction points generated using a - one-shot- or autoregressive forecast. Setting `n <= output_chunk_length` prevents auto-regression. This is - useful when the covariates don't extend far enough into the future, or to prohibit the model from using - future values of past and / or future covariates for prediction (depending on the model's covariate - support). - output_chunk_shift - Optionally, the number of steps to shift the start of the output chunk into the future (relative to the - input chunk end). This will create a gap between the input (history of target and past covariates) and - output. If the model supports `future_covariates`, the `lags_future_covariates` are relative to the first - step in the shifted output chunk. Predictions will start `output_chunk_shift` steps after the end of the - target `series`. If `output_chunk_shift` is set, the model cannot generate autoregressive predictions - (`n > output_chunk_length`). - add_encoders - A large number of past and future covariates can be automatically generated with `add_encoders`. - This can be done by adding multiple pre-defined index encoders and/or custom user-made functions that - will be used as index encoders. Additionally, a transformer such as Darts' :class:`Scaler` can be added to - transform the generated covariates. This happens all under one hood and only needs to be specified at - model creation. - Read :meth:`SequentialEncoder ` to find out more about - ``add_encoders``. Default: ``None``. An example showing some of ``add_encoders`` features: - - .. highlight:: python - .. code-block:: python - - def encode_year(idx): - return (idx.year - 1950) / 50 - - add_encoders={ - 'cyclic': {'future': ['month']}, - 'datetime_attribute': {'future': ['hour', 'dayofweek']}, - 'position': {'past': ['relative'], 'future': ['relative']}, - 'custom': {'past': [encode_year]}, - 'transformer': Scaler(), - 'tz': 'CET' - } - .. - likelihood - Can be set to `poisson` or `quantile`. If set, the model will be probabilistic, allowing sampling at - prediction time. This will overwrite any `objective` parameter. - quantiles - Fit the model to these quantiles if the `likelihood` is set to `quantile`. - random_state - Control the randomness in the fitting procedure and for sampling. - Default: ``None``. - multi_models - If True, a separate model will be trained for each future lag to predict. If False, a single model - is trained to predict all the steps in 'output_chunk_length' (features lags are shifted back by - `output_chunk_length - n` for each step `n`). Default: True. - use_static_covariates - Whether the model should use static covariate information in case the input `series` passed to ``fit()`` - contain static covariates. If ``True``, and static covariates are available at fitting time, will enforce - that all target `series` have the same static covariate dimensionality in ``fit()`` and ``predict()``. - **kwargs - Additional keyword arguments passed to `xgb.XGBRegressor`. - - Examples - -------- - Deterministic forecasting, using past/future covariates (optional) - - >>> from darts.datasets import WeatherDataset - >>> from darts.models import XGBModel - >>> series = WeatherDataset().load() - >>> # predicting atmospheric pressure - >>> target = series['p (mbar)'][:100] - >>> # optionally, use past observed rainfall (pretending to be unknown beyond index 100) - >>> past_cov = series['rain (mm)'][:100] - >>> # optionally, use future temperatures (pretending this component is a forecast) - >>> future_cov = series['T (degC)'][:106] - >>> # predict 6 pressure values using the 12 past values of pressure and rainfall, as well as the 6 temperature - >>> # values corresponding to the forecasted period - >>> model = XGBModel( - >>> lags=12, - >>> lags_past_covariates=12, - >>> lags_future_covariates=[0,1,2,3,4,5], - >>> output_chunk_length=6, - >>> ) - >>> model.fit(target, past_covariates=past_cov, future_covariates=future_cov) - >>> pred = model.predict(6) - >>> pred.values() - array([[1005.9185 ], - [1005.8315 ], - [1005.7878 ], - [1005.72626], - [1005.7475 ], - [1005.76074]]) - """ - kwargs["random_state"] = random_state # seed for tree learner - self.kwargs = kwargs - self._median_idx = None - self._model_container = None - self.quantiles = None - self._likelihood = likelihood - self._rng = None - - # parse likelihood - available_likelihoods = ["poisson", "quantile"] # to be extended - if likelihood is not None: - self._check_likelihood(likelihood, available_likelihoods) - if likelihood in {"poisson"}: - self.kwargs["objective"] = f"count:{likelihood}" - elif likelihood == "quantile": - if xgb_200_or_above: - # leverage built-in Quantile Regression - self.kwargs["objective"] = "reg:quantileerror" - self.quantiles, self._median_idx = self._prepare_quantiles(quantiles) - self._model_container = self._get_model_container() - - self._rng = np.random.default_rng(seed=random_state) # seed for sampling - - super().__init__( - lags=lags, - lags_past_covariates=lags_past_covariates, - lags_future_covariates=lags_future_covariates, - output_chunk_length=output_chunk_length, - output_chunk_shift=output_chunk_shift, - add_encoders=add_encoders, - multi_models=multi_models, - model=xgb.XGBRFClassifier(**self.kwargs), - use_static_covariates=use_static_covariates, - ) - - def fit( - self, - series: Union[TimeSeries, Sequence[TimeSeries]], - past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - val_series: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - val_past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - val_future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - max_samples_per_ts: Optional[int] = None, - n_jobs_multioutput_wrapper: Optional[int] = None, - sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, - val_sample_weight: Optional[ - Union[TimeSeries, Sequence[TimeSeries], str] - ] = None, - **kwargs, - ): - """ - Fits/trains the model using the provided list of features time series and the target time series. - - Parameters - ---------- - series - TimeSeries or Sequence[TimeSeries] object containing the target values. - past_covariates - Optionally, a series or sequence of series specifying past-observed covariates - future_covariates - Optionally, a series or sequence of series specifying future-known covariates - val_series - TimeSeries or Sequence[TimeSeries] object containing the target values for evaluation dataset - val_past_covariates - Optionally, a series or sequence of series specifying past-observed covariates for evaluation dataset - val_future_covariates : - Optionally, a series or sequence of series specifying future-known covariates for evaluation dataset - max_samples_per_ts - This is an integer upper bound on the number of tuples that can be produced - per time series. It can be used in order to have an upper bound on the total size of the dataset and - ensure proper sampling. If `None`, it will read all of the individual time series in advance (at dataset - creation) to know their sizes, which might be expensive on big datasets. - If some series turn out to have a length that would allow more than `max_samples_per_ts`, only the - most recent `max_samples_per_ts` samples will be considered. - n_jobs_multioutput_wrapper - Number of jobs of the MultiOutputRegressor wrapper to run in parallel. Only used if the model doesn't - support multi-output regression natively. - sample_weight - Optionally, some sample weights to apply to the target `series` labels. They are applied per observation, - per label (each step in `output_chunk_length`), and per component. - If a series or sequence of series, then those weights are used. If the weight series only have a single - component / column, then the weights are applied globally to all components in `series`. Otherwise, for - component-specific weights, the number of components must match those of `series`. - If a string, then the weights are generated using built-in weighting functions. The available options are - `"linear"` or `"exponential"` decay - the further in the past, the lower the weight. The weights are - computed globally based on the length of the longest series in `series`. Then for each series, the weights - are extracted from the end of the global weights. This gives a common time weighting across all series. - val_sample_weight - Same as for `sample_weight` but for the evaluation dataset. - **kwargs - Additional kwargs passed to `xgb.XGBRegressor.fit()` - """ - # TODO: XGBRegressor supports multi quantile reqression which we could leverage in the future - # see https://xgboost.readthedocs.io/en/latest/python/examples/quantile_regression.html - - super().fit( - series=series, - past_covariates=past_covariates, - future_covariates=future_covariates, - val_series=val_series, - val_past_covariates=val_past_covariates, - val_future_covariates=val_future_covariates, - max_samples_per_ts=max_samples_per_ts, - n_jobs_multioutput_wrapper=n_jobs_multioutput_wrapper, - sample_weight=sample_weight, - val_sample_weight=val_sample_weight, - **kwargs, - ) - return self - - -class XGBClassifierModel(RegressionModel, _LikelihoodMixin): - def __init__( - self, - lags: Optional[LAGS_TYPE] = None, - lags_past_covariates: Optional[LAGS_TYPE] = None, - lags_future_covariates: Optional[FUTURE_LAGS_TYPE] = None, - output_chunk_length: int = 1, - output_chunk_shift: int = 0, - add_encoders: Optional[dict] = None, - likelihood: Optional[str] = None, - quantiles: Optional[list[float]] = None, - random_state: Optional[int] = None, - multi_models: Optional[bool] = True, - use_static_covariates: bool = True, - **kwargs, - ): - """XGBoost Model - - Parameters - ---------- - lags - Lagged target `series` values used to predict the next time step/s. - If an integer, must be > 0. Uses the last `n=lags` past lags; e.g. `(-1, -2, ..., -lags)`, where `0` - corresponds the first predicted time step of each sample. If `output_chunk_shift > 0`, then - lag `-1` translates to `-1 - output_chunk_shift` steps before the first prediction step. - If a list of integers, each value must be < 0. Uses only the specified values as lags. - If a dictionary, the keys correspond to the `series` component names (of the first series when - using multiple series) and the values correspond to the component lags (integer or list of integers). The - key 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - lags_past_covariates - Lagged `past_covariates` values used to predict the next time step/s. - If an integer, must be > 0. Uses the last `n=lags_past_covariates` past lags; e.g. `(-1, -2, ..., -lags)`, - where `0` corresponds to the first predicted time step of each sample. If `output_chunk_shift > 0`, then - lag `-1` translates to `-1 - output_chunk_shift` steps before the first prediction step. - If a list of integers, each value must be < 0. Uses only the specified values as lags. - If a dictionary, the keys correspond to the `past_covariates` component names (of the first series when - using multiple series) and the values correspond to the component lags (integer or list of integers). The - key 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - lags_future_covariates - Lagged `future_covariates` values used to predict the next time step/s. The lags are always relative to the - first step in the output chunk, even when `output_chunk_shift > 0`. - If a tuple of `(past, future)`, both values must be > 0. Uses the last `n=past` past lags and `n=future` - future lags; e.g. `(-past, -(past - 1), ..., -1, 0, 1, .... future - 1)`, where `0` corresponds the first - predicted time step of each sample. If `output_chunk_shift > 0`, the position of negative lags differ from - those of `lags` and `lags_past_covariates`. In this case a future lag `-5` would point at the same - step as a target lag of `-5 + output_chunk_shift`. - If a list of integers, uses only the specified values as lags. - If a dictionary, the keys correspond to the `future_covariates` component names (of the first series when - using multiple series) and the values correspond to the component lags (tuple or list of integers). The key - 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - output_chunk_length - Number of time steps predicted at once (per chunk) by the internal model. It is not the same as forecast - horizon `n` used in `predict()`, which is the desired number of prediction points generated using a - one-shot- or autoregressive forecast. Setting `n <= output_chunk_length` prevents auto-regression. This is - useful when the covariates don't extend far enough into the future, or to prohibit the model from using - future values of past and / or future covariates for prediction (depending on the model's covariate - support). - output_chunk_shift - Optionally, the number of steps to shift the start of the output chunk into the future (relative to the - input chunk end). This will create a gap between the input (history of target and past covariates) and - output. If the model supports `future_covariates`, the `lags_future_covariates` are relative to the first - step in the shifted output chunk. Predictions will start `output_chunk_shift` steps after the end of the - target `series`. If `output_chunk_shift` is set, the model cannot generate autoregressive predictions - (`n > output_chunk_length`). - add_encoders - A large number of past and future covariates can be automatically generated with `add_encoders`. - This can be done by adding multiple pre-defined index encoders and/or custom user-made functions that - will be used as index encoders. Additionally, a transformer such as Darts' :class:`Scaler` can be added to - transform the generated covariates. This happens all under one hood and only needs to be specified at - model creation. - Read :meth:`SequentialEncoder ` to find out more about - ``add_encoders``. Default: ``None``. An example showing some of ``add_encoders`` features: - - .. highlight:: python - .. code-block:: python - - def encode_year(idx): - return (idx.year - 1950) / 50 - - add_encoders={ - 'cyclic': {'future': ['month']}, - 'datetime_attribute': {'future': ['hour', 'dayofweek']}, - 'position': {'past': ['relative'], 'future': ['relative']}, - 'custom': {'past': [encode_year]}, - 'transformer': Scaler(), - 'tz': 'CET' - } - .. - likelihood - Can be set to `poisson` or `quantile`. If set, the model will be probabilistic, allowing sampling at - prediction time. This will overwrite any `objective` parameter. - quantiles - Fit the model to these quantiles if the `likelihood` is set to `quantile`. - random_state - Control the randomness in the fitting procedure and for sampling. - Default: ``None``. - multi_models - If True, a separate model will be trained for each future lag to predict. If False, a single model - is trained to predict all the steps in 'output_chunk_length' (features lags are shifted back by - `output_chunk_length - n` for each step `n`). Default: True. - use_static_covariates - Whether the model should use static covariate information in case the input `series` passed to ``fit()`` - contain static covariates. If ``True``, and static covariates are available at fitting time, will enforce - that all target `series` have the same static covariate dimensionality in ``fit()`` and ``predict()``. - **kwargs - Additional keyword arguments passed to `xgb.XGBRegressor`. - - Examples - -------- - Deterministic forecasting, using past/future covariates (optional) - - >>> from darts.datasets import WeatherDataset - >>> from darts.models import XGBModel - >>> series = WeatherDataset().load() - >>> # predicting atmospheric pressure - >>> target = series['p (mbar)'][:100] - >>> # optionally, use past observed rainfall (pretending to be unknown beyond index 100) - >>> past_cov = series['rain (mm)'][:100] - >>> # optionally, use future temperatures (pretending this component is a forecast) - >>> future_cov = series['T (degC)'][:106] - >>> # predict 6 pressure values using the 12 past values of pressure and rainfall, as well as the 6 temperature - >>> # values corresponding to the forecasted period - >>> model = XGBModel( - >>> lags=12, - >>> lags_past_covariates=12, - >>> lags_future_covariates=[0,1,2,3,4,5], - >>> output_chunk_length=6, - >>> ) - >>> model.fit(target, past_covariates=past_cov, future_covariates=future_cov) - >>> pred = model.predict(6) - >>> pred.values() - array([[1005.9185 ], - [1005.8315 ], - [1005.7878 ], - [1005.72626], - [1005.7475 ], - [1005.76074]]) - """ - kwargs["random_state"] = random_state # seed for tree learner - self.kwargs = kwargs - self._median_idx = None - self._model_container = None - self.quantiles = None - self._likelihood = likelihood - self._rng = None - - # parse likelihood - available_likelihoods = ["poisson", "quantile"] # to be extended - if likelihood is not None: - self._check_likelihood(likelihood, available_likelihoods) - if likelihood in {"poisson"}: - self.kwargs["objective"] = f"count:{likelihood}" - elif likelihood == "quantile": - if xgb_200_or_above: - # leverage built-in Quantile Regression - self.kwargs["objective"] = "reg:quantileerror" - self.quantiles, self._median_idx = self._prepare_quantiles(quantiles) - self._model_container = self._get_model_container() - - self._rng = np.random.default_rng(seed=random_state) # seed for sampling - - super().__init__( - lags=lags, - lags_past_covariates=lags_past_covariates, - lags_future_covariates=lags_future_covariates, - output_chunk_length=output_chunk_length, - output_chunk_shift=output_chunk_shift, - add_encoders=add_encoders, - multi_models=multi_models, - model=xgb.XGBClassifier(**self.kwargs), - use_static_covariates=use_static_covariates, - ) - - def fit( - self, - series: Union[TimeSeries, Sequence[TimeSeries]], - past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - val_series: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - val_past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - val_future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - max_samples_per_ts: Optional[int] = None, - n_jobs_multioutput_wrapper: Optional[int] = None, - sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, - val_sample_weight: Optional[ - Union[TimeSeries, Sequence[TimeSeries], str] - ] = None, - **kwargs, - ): - """ - Fits/trains the model using the provided list of features time series and the target time series. - - Parameters - ---------- - series - TimeSeries or Sequence[TimeSeries] object containing the target values. - past_covariates - Optionally, a series or sequence of series specifying past-observed covariates - future_covariates - Optionally, a series or sequence of series specifying future-known covariates - val_series - TimeSeries or Sequence[TimeSeries] object containing the target values for evaluation dataset - val_past_covariates - Optionally, a series or sequence of series specifying past-observed covariates for evaluation dataset - val_future_covariates : - Optionally, a series or sequence of series specifying future-known covariates for evaluation dataset - max_samples_per_ts - This is an integer upper bound on the number of tuples that can be produced - per time series. It can be used in order to have an upper bound on the total size of the dataset and - ensure proper sampling. If `None`, it will read all of the individual time series in advance (at dataset - creation) to know their sizes, which might be expensive on big datasets. - If some series turn out to have a length that would allow more than `max_samples_per_ts`, only the - most recent `max_samples_per_ts` samples will be considered. - n_jobs_multioutput_wrapper - Number of jobs of the MultiOutputRegressor wrapper to run in parallel. Only used if the model doesn't - support multi-output regression natively. - sample_weight - Optionally, some sample weights to apply to the target `series` labels. They are applied per observation, - per label (each step in `output_chunk_length`), and per component. - If a series or sequence of series, then those weights are used. If the weight series only have a single - component / column, then the weights are applied globally to all components in `series`. Otherwise, for - component-specific weights, the number of components must match those of `series`. - If a string, then the weights are generated using built-in weighting functions. The available options are - `"linear"` or `"exponential"` decay - the further in the past, the lower the weight. The weights are - computed globally based on the length of the longest series in `series`. Then for each series, the weights - are extracted from the end of the global weights. This gives a common time weighting across all series. - val_sample_weight - Same as for `sample_weight` but for the evaluation dataset. - **kwargs - Additional kwargs passed to `xgb.XGBRegressor.fit()` - """ - # TODO: XGBRegressor supports multi quantile reqression which we could leverage in the future - # see https://xgboost.readthedocs.io/en/latest/python/examples/quantile_regression.html - - super().fit( - series=series, - past_covariates=past_covariates, - future_covariates=future_covariates, - val_series=val_series, - val_past_covariates=val_past_covariates, - val_future_covariates=val_future_covariates, - max_samples_per_ts=max_samples_per_ts, - n_jobs_multioutput_wrapper=n_jobs_multioutput_wrapper, - sample_weight=sample_weight, - val_sample_weight=val_sample_weight, - **kwargs, - ) - return self - - -class LightGBMClassifierModel(RegressionModelWithCategoricalCovariates, _LikelihoodMixin): - def __init__( - self, - lags: Optional[LAGS_TYPE] = None, - lags_past_covariates: Optional[LAGS_TYPE] = None, - lags_future_covariates: Optional[FUTURE_LAGS_TYPE] = None, - output_chunk_length: int = 1, - output_chunk_shift: int = 0, - add_encoders: Optional[dict] = None, - likelihood: Optional[str] = None, - quantiles: Optional[list[float]] = None, - random_state: Optional[int] = None, - multi_models: Optional[bool] = True, - use_static_covariates: bool = True, - categorical_past_covariates: Optional[Union[str, list[str]]] = None, - categorical_future_covariates: Optional[Union[str, list[str]]] = None, - categorical_static_covariates: Optional[Union[str, list[str]]] = None, - **kwargs, - ): - """LGBM Model - - Parameters - ---------- - lags - Lagged target `series` values used to predict the next time step/s. - If an integer, must be > 0. Uses the last `n=lags` past lags; e.g. `(-1, -2, ..., -lags)`, where `0` - corresponds the first predicted time step of each sample. If `output_chunk_shift > 0`, then - lag `-1` translates to `-1 - output_chunk_shift` steps before the first prediction step. - If a list of integers, each value must be < 0. Uses only the specified values as lags. - If a dictionary, the keys correspond to the `series` component names (of the first series when - using multiple series) and the values correspond to the component lags (integer or list of integers). The - key 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - lags_past_covariates - Lagged `past_covariates` values used to predict the next time step/s. - If an integer, must be > 0. Uses the last `n=lags_past_covariates` past lags; e.g. `(-1, -2, ..., -lags)`, - where `0` corresponds to the first predicted time step of each sample. If `output_chunk_shift > 0`, then - lag `-1` translates to `-1 - output_chunk_shift` steps before the first prediction step. - If a list of integers, each value must be < 0. Uses only the specified values as lags. - If a dictionary, the keys correspond to the `past_covariates` component names (of the first series when - using multiple series) and the values correspond to the component lags (integer or list of integers). The - key 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - lags_future_covariates - Lagged `future_covariates` values used to predict the next time step/s. The lags are always relative to the - first step in the output chunk, even when `output_chunk_shift > 0`. - If a tuple of `(past, future)`, both values must be > 0. Uses the last `n=past` past lags and `n=future` - future lags; e.g. `(-past, -(past - 1), ..., -1, 0, 1, .... future - 1)`, where `0` corresponds the first - predicted time step of each sample. If `output_chunk_shift > 0`, the position of negative lags differ from - those of `lags` and `lags_past_covariates`. In this case a future lag `-5` would point at the same - step as a target lag of `-5 + output_chunk_shift`. - If a list of integers, uses only the specified values as lags. - If a dictionary, the keys correspond to the `future_covariates` component names (of the first series when - using multiple series) and the values correspond to the component lags (tuple or list of integers). The key - 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - output_chunk_length - Number of time steps predicted at once (per chunk) by the internal model. It is not the same as forecast - horizon `n` used in `predict()`, which is the desired number of prediction points generated using a - one-shot- or autoregressive forecast. Setting `n <= output_chunk_length` prevents auto-regression. This is - useful when the covariates don't extend far enough into the future, or to prohibit the model from using - future values of past and / or future covariates for prediction (depending on the model's covariate - support). - output_chunk_shift - Optionally, the number of steps to shift the start of the output chunk into the future (relative to the - input chunk end). This will create a gap between the input (history of target and past covariates) and - output. If the model supports `future_covariates`, the `lags_future_covariates` are relative to the first - step in the shifted output chunk. Predictions will start `output_chunk_shift` steps after the end of the - target `series`. If `output_chunk_shift` is set, the model cannot generate autoregressive predictions - (`n > output_chunk_length`). - add_encoders - A large number of past and future covariates can be automatically generated with `add_encoders`. - This can be done by adding multiple pre-defined index encoders and/or custom user-made functions that - will be used as index encoders. Additionally, a transformer such as Darts' :class:`Scaler` can be added to - transform the generated covariates. This happens all under one hood and only needs to be specified at - model creation. - Read :meth:`SequentialEncoder ` to find out more about - ``add_encoders``. Default: ``None``. An example showing some of ``add_encoders`` features: - - .. highlight:: python - .. code-block:: python - - def encode_year(idx): - return (idx.year - 1950) / 50 - - add_encoders={ - 'cyclic': {'future': ['month']}, - 'datetime_attribute': {'future': ['hour', 'dayofweek']}, - 'position': {'past': ['relative'], 'future': ['relative']}, - 'custom': {'past': [encode_year]}, - 'transformer': Scaler(), - 'tz': 'CET' - } - .. - likelihood - Can be set to `quantile` or `poisson`. If set, the model will be probabilistic, allowing sampling at - prediction time. This will overwrite any `objective` parameter. - quantiles - Fit the model to these quantiles if the `likelihood` is set to `quantile`. - random_state - Control the randomness in the fitting procedure and for sampling. - Default: ``None``. - multi_models - If True, a separate model will be trained for each future lag to predict. If False, a single model - is trained to predict all the steps in 'output_chunk_length' (features lags are shifted back by - `output_chunk_length - n` for each step `n`). Default: True. - use_static_covariates - Whether the model should use static covariate information in case the input `series` passed to ``fit()`` - contain static covariates. If ``True``, and static covariates are available at fitting time, will enforce - that all target `series` have the same static covariate dimensionality in ``fit()`` and ``predict()``. - categorical_past_covariates - Optionally, component name or list of component names specifying the past covariates that should be treated - as categorical by the underlying `lightgbm.LightGBMRegressor`. It's recommended that the components that - are treated as categorical are integer-encoded. For more information on how LightGBM handles categorical - features, visit: `Categorical feature support documentation - `_ - categorical_future_covariates - Optionally, component name or list of component names specifying the future covariates that should be - treated as categorical by the underlying `lightgbm.LightGBMRegressor`. It's recommended that the components - that are treated as categorical are integer-encoded. - categorical_static_covariates - Optionally, string or list of strings specifying the static covariates that should be treated as categorical - by the underlying `lightgbm.LightGBMRegressor`. It's recommended that the static covariates that are - treated as categorical are integer-encoded. - **kwargs - Additional keyword arguments passed to `lightgbm.LGBRegressor`. - - Examples - -------- - >>> from darts.datasets import WeatherDataset - >>> from darts.models import LightGBMModel - >>> series = WeatherDataset().load() - >>> # predicting atmospheric pressure - >>> target = series['p (mbar)'][:100] - >>> # optionally, use past observed rainfall (pretending to be unknown beyond index 100) - >>> past_cov = series['rain (mm)'][:100] - >>> # optionally, use future temperatures (pretending this component is a forecast) - >>> future_cov = series['T (degC)'][:106] - >>> # predict 6 pressure values using the 12 past values of pressure and rainfall, as well as the 6 temperature - >>> # values corresponding to the forecasted period - >>> model = LightGBMModel( - >>> lags=12, - >>> lags_past_covariates=12, - >>> lags_future_covariates=[0,1,2,3,4,5], - >>> output_chunk_length=6, - >>> verbose=-1 - >>> ) - >>> model.fit(target, past_covariates=past_cov, future_covariates=future_cov) - >>> pred = model.predict(6) - >>> pred.values() - array([[1006.85376674], - [1006.83998586], - [1006.63884831], - [1006.57201255], - [1006.52290556], - [1006.39550065]]) - """ - kwargs["random_state"] = random_state # seed for tree learner - self.kwargs = kwargs - self._median_idx = None - self._model_container = None - self.quantiles = None - self._likelihood = likelihood - self._rng = None - - # parse likelihood - available_likelihoods = ["quantile", "poisson"] # to be extended - if likelihood is not None: - self._check_likelihood(likelihood, available_likelihoods) - self.kwargs["objective"] = likelihood - self._rng = np.random.default_rng(seed=random_state) # seed for sampling - - if likelihood == "quantile": - self.quantiles, self._median_idx = self._prepare_quantiles(quantiles) - self._model_container = self._get_model_container() - - super().__init__( - lags=lags, - lags_past_covariates=lags_past_covariates, - lags_future_covariates=lags_future_covariates, - output_chunk_length=output_chunk_length, - output_chunk_shift=output_chunk_shift, - add_encoders=add_encoders, - multi_models=multi_models, - model=lgb.LGBMClassifier(**self.kwargs), - use_static_covariates=use_static_covariates, - categorical_past_covariates=categorical_past_covariates, - categorical_future_covariates=categorical_future_covariates, - categorical_static_covariates=categorical_static_covariates, - ) - - def fit( - self, - series: Union[TimeSeries, Sequence[TimeSeries]], - past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - val_series: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - val_past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - val_future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, - max_samples_per_ts: Optional[int] = None, - n_jobs_multioutput_wrapper: Optional[int] = None, - sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, - val_sample_weight: Optional[ - Union[TimeSeries, Sequence[TimeSeries], str] - ] = None, - **kwargs, - ): - """ - Fits/trains the model using the provided list of features time series and the target time series. - - Parameters - ---------- - series - TimeSeries or Sequence[TimeSeries] object containing the target values. - past_covariates - Optionally, a series or sequence of series specifying past-observed covariates - future_covariates - Optionally, a series or sequence of series specifying future-known covariates - val_series - TimeSeries or Sequence[TimeSeries] object containing the target values for evaluation dataset - val_past_covariates - Optionally, a series or sequence of series specifying past-observed covariates for evaluation dataset - val_future_covariates : Union[TimeSeries, Sequence[TimeSeries]] - Optionally, a series or sequence of series specifying future-known covariates for evaluation dataset - max_samples_per_ts - This is an integer upper bound on the number of tuples that can be produced - per time series. It can be used in order to have an upper bound on the total size of the dataset and - ensure proper sampling. If `None`, it will read all of the individual time series in advance (at dataset - creation) to know their sizes, which might be expensive on big datasets. - If some series turn out to have a length that would allow more than `max_samples_per_ts`, only the - most recent `max_samples_per_ts` samples will be considered. - n_jobs_multioutput_wrapper - Number of jobs of the MultiOutputRegressor wrapper to run in parallel. Only used if the model doesn't - support multi-output regression natively. - sample_weight - Optionally, some sample weights to apply to the target `series` labels. They are applied per observation, - per label (each step in `output_chunk_length`), and per component. - If a series or sequence of series, then those weights are used. If the weight series only have a single - component / column, then the weights are applied globally to all components in `series`. Otherwise, for - component-specific weights, the number of components must match those of `series`. - If a string, then the weights are generated using built-in weighting functions. The available options are - `"linear"` or `"exponential"` decay - the further in the past, the lower the weight. The weights are - computed globally based on the length of the longest series in `series`. Then for each series, the weights - are extracted from the end of the global weights. This gives a common time weighting across all series. - val_sample_weight - Same as for `sample_weight` but for the evaluation dataset. - **kwargs - Additional kwargs passed to `lightgbm.LGBRegressor.fit()` - """ - - super().fit( - series=series, - past_covariates=past_covariates, - future_covariates=future_covariates, - val_series=val_series, - val_past_covariates=val_past_covariates, - val_future_covariates=val_future_covariates, - max_samples_per_ts=max_samples_per_ts, - n_jobs_multioutput_wrapper=n_jobs_multioutput_wrapper, - sample_weight=sample_weight, - val_sample_weight=val_sample_weight, - **kwargs, - ) - return self - - -class RandomForestClassifierModel(RegressionModel): - def __init__( - self, - lags: Optional[LAGS_TYPE] = None, - lags_past_covariates: Optional[LAGS_TYPE] = None, - lags_future_covariates: Optional[FUTURE_LAGS_TYPE] = None, - output_chunk_length: int = 1, - output_chunk_shift: int = 0, - add_encoders: Optional[dict] = None, - n_estimators: Optional[int] = 100, - max_depth: Optional[int] = None, - multi_models: Optional[bool] = True, - use_static_covariates: bool = True, - **kwargs, - ): - """Random Forest Model - - Parameters - ---------- - lags - Lagged target `series` values used to predict the next time step/s. - If an integer, must be > 0. Uses the last `n=lags` past lags; e.g. `(-1, -2, ..., -lags)`, where `0` - corresponds the first predicted time step of each sample. If `output_chunk_shift > 0`, then - lag `-1` translates to `-1 - output_chunk_shift` steps before the first prediction step. - If a list of integers, each value must be < 0. Uses only the specified values as lags. - If a dictionary, the keys correspond to the `series` component names (of the first series when - using multiple series) and the values correspond to the component lags (integer or list of integers). The - key 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - lags_past_covariates - Lagged `past_covariates` values used to predict the next time step/s. - If an integer, must be > 0. Uses the last `n=lags_past_covariates` past lags; e.g. `(-1, -2, ..., -lags)`, - where `0` corresponds to the first predicted time step of each sample. If `output_chunk_shift > 0`, then - lag `-1` translates to `-1 - output_chunk_shift` steps before the first prediction step. - If a list of integers, each value must be < 0. Uses only the specified values as lags. - If a dictionary, the keys correspond to the `past_covariates` component names (of the first series when - using multiple series) and the values correspond to the component lags (integer or list of integers). The - key 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - lags_future_covariates - Lagged `future_covariates` values used to predict the next time step/s. The lags are always relative to the - first step in the output chunk, even when `output_chunk_shift > 0`. - If a tuple of `(past, future)`, both values must be > 0. Uses the last `n=past` past lags and `n=future` - future lags; e.g. `(-past, -(past - 1), ..., -1, 0, 1, .... future - 1)`, where `0` corresponds the first - predicted time step of each sample. If `output_chunk_shift > 0`, the position of negative lags differ from - those of `lags` and `lags_past_covariates`. In this case a future lag `-5` would point at the same - step as a target lag of `-5 + output_chunk_shift`. - If a list of integers, uses only the specified values as lags. - If a dictionary, the keys correspond to the `future_covariates` component names (of the first series when - using multiple series) and the values correspond to the component lags (tuple or list of integers). The key - 'default_lags' can be used to provide default lags for un-specified components. Raises and error if some - components are missing and the 'default_lags' key is not provided. - output_chunk_length - Number of time steps predicted at once (per chunk) by the internal model. It is not the same as forecast - horizon `n` used in `predict()`, which is the desired number of prediction points generated using a - one-shot- or autoregressive forecast. Setting `n <= output_chunk_length` prevents auto-regression. This is - useful when the covariates don't extend far enough into the future, or to prohibit the model from using - future values of past and / or future covariates for prediction (depending on the model's covariate - support). - output_chunk_shift - Optionally, the number of steps to shift the start of the output chunk into the future (relative to the - input chunk end). This will create a gap between the input (history of target and past covariates) and - output. If the model supports `future_covariates`, the `lags_future_covariates` are relative to the first - step in the shifted output chunk. Predictions will start `output_chunk_shift` steps after the end of the - target `series`. If `output_chunk_shift` is set, the model cannot generate autoregressive predictions - (`n > output_chunk_length`). - add_encoders - A large number of past and future covariates can be automatically generated with `add_encoders`. - This can be done by adding multiple pre-defined index encoders and/or custom user-made functions that - will be used as index encoders. Additionally, a transformer such as Darts' :class:`Scaler` can be added to - transform the generated covariates. This happens all under one hood and only needs to be specified at - model creation. - Read :meth:`SequentialEncoder ` to find out more about - ``add_encoders``. Default: ``None``. An example showing some of ``add_encoders`` features: - - .. highlight:: python - .. code-block:: python - - def encode_year(idx): - return (idx.year - 1950) / 50 - - add_encoders={ - 'cyclic': {'future': ['month']}, - 'datetime_attribute': {'future': ['hour', 'dayofweek']}, - 'position': {'past': ['relative'], 'future': ['relative']}, - 'custom': {'past': [encode_year]}, - 'transformer': Scaler(), - 'tz': 'CET' - } - .. - n_estimators : int - The number of trees in the forest. - max_depth : int - The maximum depth of the tree. If None, then nodes are expanded until all leaves are pure or until all - leaves contain less than min_samples_split samples. - multi_models - If True, a separate model will be trained for each future lag to predict. If False, a single model - is trained to predict all the steps in 'output_chunk_length' (features lags are shifted back by - `output_chunk_length - n` for each step `n`). Default: True. - use_static_covariates - Whether the model should use static covariate information in case the input `series` passed to ``fit()`` - contain static covariates. If ``True``, and static covariates are available at fitting time, will enforce - that all target `series` have the same static covariate dimensionality in ``fit()`` and ``predict()``. - **kwargs - Additional keyword arguments passed to `sklearn.ensemble.RandomForest`. + @staticmethod + def _create_model(**kwargs): + """Create an XGBRFRegressor model.""" + return xgb.XGBRFRegressor(**kwargs) - Examples - -------- - >>> from darts.datasets import WeatherDataset - >>> from darts.models import RandomForest - >>> series = WeatherDataset().load() - >>> # predicting atmospheric pressure - >>> target = series['p (mbar)'][:100] - >>> # optionally, use past observed rainfall (pretending to be unknown beyond index 100) - >>> past_cov = series['rain (mm)'][:100] - >>> # optionally, use future temperatures (pretending this component is a forecast) - >>> future_cov = series['T (degC)'][:106] - >>> # random forest with 200 trees trained with MAE - >>> model = RandomForest( - >>> lags=12, - >>> lags_past_covariates=12, - >>> lags_future_covariates=[0,1,2,3,4,5], - >>> output_chunk_length=6, - >>> n_estimators=200, - >>> criterion="absolute_error", - >>> ) - >>> model.fit(target, past_covariates=past_cov, future_covariates=future_cov) - >>> pred = model.predict(6) - >>> pred.values() - array([[1006.29805], - [1006.23675], - [1006.17325], - [1006.10295], - [1006.06505], - [1006.05465]]) - """ - self.n_estimators = n_estimators - self.max_depth = max_depth - self.kwargs = kwargs - self.kwargs["n_estimators"] = self.n_estimators - self.kwargs["max_depth"] = self.max_depth +class XGBRFClassifierModel(XGBClassifierModel): + """ + A Darts wrapper for the XGBoost Random Forest Classifier. + + This class inherits all functionality from darts.models.XGBClassifierModel + and simply replaces the underlying model with + `xgb.XGBRFClassifier`. + """ - super().__init__( - lags=lags, - lags_past_covariates=lags_past_covariates, - lags_future_covariates=lags_future_covariates, - output_chunk_length=output_chunk_length, - output_chunk_shift=output_chunk_shift, - add_encoders=add_encoders, - multi_models=multi_models, - model=RandomForestClassifier(**kwargs), - use_static_covariates=use_static_covariates, - ) \ No newline at end of file + @staticmethod + def _create_model(**kwargs): + """Create an XGBRFClassifier model.""" + return xgb.XGBRFClassifier(**kwargs) \ No newline at end of file diff --git a/views_stepshifter/models/hurdle_model.py b/views_stepshifter/models/hurdle_model.py index bf60c40..cfe8f5d 100644 --- a/views_stepshifter/models/hurdle_model.py +++ b/views_stepshifter/models/hurdle_model.py @@ -42,28 +42,22 @@ def _resolve_clf_model(self, func_name: str): match func_name: case "XGBClassifier": - from views_stepshifter.models.darts_model import XGBClassifierModel - if self.get_device_params().get("device") == "cuda": - logger.info("\033[92mUsing CUDA for XGBClassifierModel\033[0m") - cuda_params = {"tree_method": "hist", "device": "cuda"} - return partial(XGBClassifierModel, **cuda_params) + from darts.models import XGBClassifierModel + # if self.get_device_params().get("device") == "cuda": + # logger.info("\033[92mUsing CUDA for XGBClassifierModel\033[0m") + # cuda_params = {"tree_method": "hist", "device": "cuda"} + # return partial(XGBClassifierModel, **cuda_params) return XGBClassifierModel case "XGBRFClassifier": from views_stepshifter.models.darts_model import XGBRFClassifierModel - if self.get_device_params().get("device") == "cuda": - logger.info("\033[92mUsing CUDA for XGBRFClassifierModel\033[0m") - cuda_params = {"tree_method": "hist", "device": "cuda"} - return partial(XGBRFClassifierModel, **cuda_params) + # if self.get_device_params().get("device") == "cuda": + # logger.info("\033[92mUsing CUDA for XGBRFClassifierModel\033[0m") + # cuda_params = {"tree_method": "hist", "device": "cuda"} + # return partial(XGBRFClassifierModel, **cuda_params) return XGBRFClassifierModel case "LGBMClassifier": - from views_stepshifter.models.darts_model import LightGBMClassifierModel + from darts.models import LightGBMClassifierModel return LightGBMClassifierModel - case "RandomForestClassifier": - from views_stepshifter.models.darts_model import ( - RandomForestClassifierModel, - ) - - return RandomForestClassifierModel case _: raise ValueError( f"Model {func_name} is not a valid Darts forecasting model or is not supported now. " diff --git a/views_stepshifter/models/stepshifter.py b/views_stepshifter/models/stepshifter.py index 264d97b..43763e9 100644 --- a/views_stepshifter/models/stepshifter.py +++ b/views_stepshifter/models/stepshifter.py @@ -6,7 +6,7 @@ from sklearn.utils.validation import check_is_fitted from typing import List, Dict from views_stepshifter.models.validation import views_validate -from views_pipeline_core.managers.model import ModelManager, ForecastingModelManager +from views_pipeline_core.managers.model import ForecastingModelManager import tqdm from concurrent.futures import ProcessPoolExecutor, as_completed import torch @@ -44,23 +44,23 @@ def get_device_params(): def _resolve_reg_model(self, func_name: str): """ Lookup table for supported regression models - views_stepshifter.models.darts_model are custom models that inherit from darts.models + Note that stepshifter doesn't support cuda for now before we figure out how to move the input data to the GPU. (fit works intelligently but predict doesn't) """ match func_name: case "XGBRFRegressor": from views_stepshifter.models.darts_model import XGBRFModel - if self.get_device_params().get("device") == "cuda": - logger.info("\033[92mUsing CUDA for XGBRFRegressor\033[0m") - cuda_params = {"tree_method": "hist", "device": "cuda"} - return partial(XGBRFModel, **cuda_params) + # if self.get_device_params().get("device") == "cuda": + # logger.info("\033[92mUsing CUDA for XGBRFRegressor\033[0m") + # cuda_params = {"tree_method": "hist", "device": "cuda"} + # return partial(XGBRFModel, **cuda_params) return XGBRFModel case "XGBRegressor": from darts.models import XGBModel - if self.get_device_params().get("device") == "cuda": - logger.info("\033[92mUsing CUDA for XGBRegressor\033[0m") - cuda_params = {"tree_method": "hist", "device": "cuda"} - return partial(XGBModel, **cuda_params) + # if self.get_device_params().get("device") == "cuda": + # logger.info("\033[92mUsing CUDA for XGBRegressor\033[0m") + # cuda_params = {"tree_method": "hist", "device": "cuda"} + # return partial(XGBModel, **cuda_params) return XGBModel case "LGBMRegressor": from darts.models import LightGBMModel @@ -69,10 +69,6 @@ def _resolve_reg_model(self, func_name: str): # cuda_params = {"device": "cuda"} # return partial(LightGBMModel, **cuda_params) return LightGBMModel - case "RandomForestRegressor": - from darts.models import RandomForest - - return RandomForest case _: raise ValueError( f"Model {func_name} is not a valid forecasting model or is not supported now. " @@ -165,7 +161,7 @@ def _predict_by_step(self, model, step: int, sequence_number: int): # process the predictions index_tuples, df_list = [], [] for pred in ts_pred: - df_pred = pred.pd_dataframe().loc[ + df_pred = pred.to_dataframe().loc[ [self._test_start + step + sequence_number - 1] ] level = int(pred.static_covariates.iat[0, 0]) From 95fa194c7006c5a102351b84de9f38b48b045b7e Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Mon, 17 Nov 2025 15:09:51 +0100 Subject: [PATCH 07/14] update tests --- tests/test_hurdle_model.py | 4 +- tests/test_stepshifter_manager.py | 149 +++++++++++++++++++++--------- 2 files changed, 107 insertions(+), 46 deletions(-) diff --git a/tests/test_hurdle_model.py b/tests/test_hurdle_model.py index 734276b..f121a31 100644 --- a/tests/test_hurdle_model.py +++ b/tests/test_hurdle_model.py @@ -10,8 +10,8 @@ def sample_config(): return { "steps": [1, 2, 3], "targets": ["target"], - "model_clf": "RandomForestClassifier", - "model_reg": "RandomForestRegressor", + "model_clf": "LGBMClassifier", + "model_reg": "LGBMRegressor", "parameters": {"clf": {"n_estimators": 100, "max_depth": 10}, "reg": {}}, "sweep": False, "metrics": ["test_metric"] diff --git a/tests/test_stepshifter_manager.py b/tests/test_stepshifter_manager.py index fb56a2d..bcc9e67 100644 --- a/tests/test_stepshifter_manager.py +++ b/tests/test_stepshifter_manager.py @@ -6,12 +6,21 @@ from views_stepshifter.manager.stepshifter_manager import StepshifterManager from views_stepshifter.models.stepshifter import StepshifterModel from views_pipeline_core.managers.model import ModelPathManager +from views_pipeline_core.managers.configuration.configuration import ConfigurationManager +from views_pipeline_core.cli.args import ForecastingModelArgs + @pytest.fixture def mock_model_path(): - mock_path = MagicMock() + mock_path = MagicMock(spec=ModelPathManager) mock_path.model_dir = "/path/to/models/test_model" mock_path.target = "model" + mock_path.artifacts = Path("/path/to/artifacts") + mock_path.get_latest_model_artifact_path.return_value = Path("predictions_test_run_202401011200000") + mock_path.logging = MagicMock() + mock_path.models = Path("/path/to/models_root") + mock_path.model_name = "test_model" + mock_path.data_raw = Path("/path/to/data_raw") return mock_path @pytest.fixture @@ -81,26 +90,51 @@ def mock_partitioner_dict(): @pytest.fixture def stepshifter_manager(mock_model_path, mock_config_meta, mock_config_deployment, mock_config_hyperparameters, mock_config_sweep, mock_partitioner_dict): + """ + Provides a StepshifterManager instance for a non-hurdle model. + + It patches: + - _ModelManager__load_config: To inject mock config dictionaries. + - validate_config: To prevent validation errors during test setup. + """ with patch.object(StepshifterManager, '_ModelManager__load_config', side_effect=lambda file, func: { "config_meta.py": mock_config_meta, "config_deployment.py": mock_config_deployment, "config_hyperparameters.py": mock_config_hyperparameters, "config_sweep.py": mock_config_sweep - }.get(file, None)): + }.get(file, None)), \ + patch("views_pipeline_core.managers.configuration.configuration.validate_config"): + manager = StepshifterManager(mock_model_path, use_prediction_store=False) - print(manager._data_loader) - return manager + + manager._data_loader = MagicMock() + manager._data_loader.partition_dict = mock_partitioner_dict + + yield manager @pytest.fixture -def stepshifter_manager_hurdle(mock_model_path, mock_config_meta_hurdle, mock_config_deployment, mock_config_hyperparameters_hurdle, mock_config_sweep): +def stepshifter_manager_hurdle(mock_model_path, mock_config_meta_hurdle, mock_config_deployment, mock_config_hyperparameters_hurdle, mock_config_sweep, mock_partitioner_dict): + """ + Provides a StepshifterManager instance for a hurdle model. + + It patches: + - _ModelManager__load_config: To inject mock config dictionaries. + - validate_config: To prevent validation errors during test setup. + """ with patch.object(StepshifterManager, '_ModelManager__load_config', side_effect=lambda file, func: { "config_meta.py": mock_config_meta_hurdle, "config_deployment.py": mock_config_deployment, "config_hyperparameters.py": mock_config_hyperparameters_hurdle, "config_sweep.py": mock_config_sweep - }.get(file, None)): + }.get(file, None)), \ + patch("views_pipeline_core.managers.configuration.configuration.validate_config"): + manager = StepshifterManager(mock_model_path, use_prediction_store=False) - return manager + + manager._data_loader = MagicMock() + manager._data_loader.partition_dict = mock_partitioner_dict + + yield manager def test_stepshifter_manager_init_hurdle(stepshifter_manager_hurdle): """ @@ -147,7 +181,8 @@ def test_split_hurdle_parameters(stepshifter_manager_hurdle): """ Test the _split_hurdle_parameters method to ensure it correctly splits the parameters for HurdleModel. """ - stepshifter_manager_hurdle.config = { + stepshifter_manager_hurdle.configs = { + "algorithm": "HurdleModel", "clf_param1": "value1", "clf_param2": "value2", "reg_param1": "value3", @@ -164,7 +199,14 @@ def test_get_model(stepshifter_manager, stepshifter_manager_hurdle, mock_partiti with patch("views_stepshifter.manager.stepshifter_manager.HurdleModel") as mock_hurdle_model, \ patch("views_stepshifter.manager.stepshifter_manager.StepshifterModel") as mock_stepshifter_model: - stepshifter_manager_hurdle.config = stepshifter_manager_hurdle._update_single_config(MagicMock(run_type="test_run_type")) + # --- Test Hurdle --- + args = ForecastingModelArgs(run_type="test_run_type", saved=True) + + # We must include the "algorithm" key, otherwise _is_hurdle gets reset to False + hurdle_args = vars(args) + hurdle_args["algorithm"] = "HurdleModel" + stepshifter_manager_hurdle.configs = hurdle_args + stepshifter_manager_hurdle._get_model(mock_partitioner_dict) mock_hurdle_model.assert_called_once_with(stepshifter_manager_hurdle.config, mock_partitioner_dict) mock_stepshifter_model.assert_not_called() @@ -172,7 +214,12 @@ def test_get_model(stepshifter_manager, stepshifter_manager_hurdle, mock_partiti mock_hurdle_model.reset_mock() mock_stepshifter_model.reset_mock() - stepshifter_manager.config = stepshifter_manager._update_single_config(MagicMock(run_type="test_run_type")) + # --- Test Non-Hurdle --- + args = ForecastingModelArgs(run_type="test_run_type", saved=True) + non_hurdle_args = vars(args) + non_hurdle_args["algorithm"] = "LightGBMModel" + stepshifter_manager.configs = non_hurdle_args + stepshifter_manager._get_model(mock_partitioner_dict) mock_stepshifter_model.assert_called_once_with(stepshifter_manager.config, mock_partitioner_dict) mock_hurdle_model.assert_not_called() @@ -185,9 +232,13 @@ def test_train_model_artifact(stepshifter_manager, stepshifter_manager_hurdle): patch("views_stepshifter.manager.stepshifter_manager.read_dataframe") as mock_read_dataframe, \ patch("views_stepshifter.manager.stepshifter_manager.StepshifterManager._get_model") as mock_get_model: - # StepshifterManager with StepshifterModel - stepshifter_manager.config = stepshifter_manager._update_single_config(MagicMock(run_type="test_run_type")) - stepshifter_manager.config["sweep"] = False + # --- Test Non-Hurdle --- + args = ForecastingModelArgs(run_type="test_run_type", train=True) + non_hurdle_args = vars(args) + non_hurdle_args["algorithm"] = "LightGBMModel" + non_hurdle_args["sweep"] = False + stepshifter_manager.configs = non_hurdle_args + stepshifter_manager._train_model_artifact() mock_split_hurdle.assert_not_called() @@ -197,11 +248,22 @@ def test_train_model_artifact(stepshifter_manager, stepshifter_manager_hurdle): mock_get_model.return_value.fit.assert_called_once() mock_get_model.return_value.save.assert_called_once() - # StepshifterManager with HurdleModel - stepshifter_manager_hurdle.config = stepshifter_manager_hurdle._update_single_config(MagicMock(run_type="test_run_type")) + mock_read_dataframe.reset_mock() + mock_get_model.reset_mock() + + mock_split_hurdle.reset_mock() + + # --- Test Hurdle --- + args = ForecastingModelArgs(run_type="test_run_type", train=True) + hurdle_args = vars(args) + hurdle_args["algorithm"] = "HurdleModel" + stepshifter_manager_hurdle.configs = hurdle_args + stepshifter_manager_hurdle._is_hurdle = True + stepshifter_manager_hurdle._train_model_artifact() - mock_split_hurdle.assert_called_once() + mock_read_dataframe.assert_called_once() + mock_get_model.assert_called_once_with(stepshifter_manager_hurdle._data_loader.partition_dict) def test_evaluate_model_artifact(stepshifter_manager): """ @@ -215,9 +277,10 @@ def test_evaluate_model_artifact(stepshifter_manager): patch.object(StepshifterManager, "_get_standardized_df", return_value="standardized_df") as mock_get_standardized_df: - # the else branch - stepshifter_manager._model_path.get_latest_model_artifact_path.return_value = Path("predictions_test_run_202401011200000") - stepshifter_manager.config = stepshifter_manager._update_single_config(MagicMock(run_type="test_run_type")) + # --- Test default artifact branch (else) --- + args = ForecastingModelArgs(run_type="test_run_type", evaluate=True, saved=True) + stepshifter_manager.configs = vars(args) + eval_type = "test_eval_type" artifact_name = None stepshifter_manager._evaluate_model_artifact(eval_type, artifact_name) @@ -225,20 +288,19 @@ def test_evaluate_model_artifact(stepshifter_manager): assert stepshifter_manager.config["run_type"] == "test_run_type" mock_logger.info.assert_called_once_with(f"Using latest (default) run type (test_run_type) specific artifact") assert stepshifter_manager.config["timestamp"] == "202401011200000" - # mock_read_dataframe.assert_called_once() mock_get_standardized_df.assert_called_once() mock_logger.reset_mock() - - # the if branch + # --- Test specific artifact branch (if) --- artifact_name = "non_default_artifact.pkl" - stepshifter_manager._model_path.artifacts = Path("predictions_test_run_202401011200000") + expected_path = stepshifter_manager._model_path.artifacts / artifact_name + stepshifter_manager._evaluate_model_artifact(eval_type, artifact_name) mock_logger.info.assert_called_once_with(f"Using (non-default) artifact: {artifact_name}") - path_artifact = stepshifter_manager._model_path.artifacts / artifact_name - assert path_artifact == Path("predictions_test_run_202401011200000/non_default_artifact.pkl") + assert expected_path == Path("/path/to/artifacts/non_default_artifact.pkl") + def test_forecast_model_artifact(stepshifter_manager): """ @@ -252,35 +314,34 @@ def test_forecast_model_artifact(stepshifter_manager): patch.object(StepshifterManager, "_get_standardized_df", return_value="standardized_df") as mock_get_standardized_df: - # the else branch - # mock_read_dataframe.return_value = pd.DataFrame({"a": [1, 2, 3]}) - stepshifter_manager._model_path.get_latest_model_artifact_path.return_value = Path("predictions_test_run_202401011200000") - stepshifter_manager.config = stepshifter_manager._update_single_config(MagicMock(run_type="test_run_type")) + # --- Test default artifact branch (else) --- + args = ForecastingModelArgs(run_type="forecasting", forecast=True, saved=True) + stepshifter_manager.configs = vars(args) + artifact_name = None stepshifter_manager._forecast_model_artifact(artifact_name) - assert stepshifter_manager.config["run_type"] == "test_run_type" - mock_logger.info.assert_called_once_with(f"Using latest (default) run type (test_run_type) specific artifact") - assert stepshifter_manager.config["timestamp"] == "202401011200000" - # mock_read_dataframe.assert_called_once() - mock_model.predict.assert_called_once_with("test_run_type") + assert stepshifter_manager.config["run_type"] == "forecasting" + mock_logger.info.assert_called_once_with(f"Using latest (default) run type (forecasting) specific artifact") + mock_model.predict.assert_called_once_with("forecasting") mock_get_standardized_df.assert_called_once() mock_logger.reset_mock() + mock_model.predict.reset_mock() + mock_get_standardized_df.reset_mock() - - # the if branch + # --- Test specific artifact branch (if) with FileNotFoundError --- mock_builtins_open.side_effect = FileNotFoundError("Test error") artifact_name = "non_default_artifact.pkl" - stepshifter_manager._model_path.artifacts = Path("predictions_test_run_202401011200000") + with pytest.raises(FileNotFoundError) as exc_info: stepshifter_manager._forecast_model_artifact(artifact_name) - assert str(exc_info.value) == "Test error" + assert str(exc_info.value) == "Test error" mock_logger.info.assert_called_once_with(f"Using (non-default) artifact: {artifact_name}") - path_artifact = stepshifter_manager._model_path.artifacts / artifact_name - assert path_artifact == Path("predictions_test_run_202401011200000/non_default_artifact.pkl") + path_artifact = stepshifter_manager._model_path.artifacts / artifact_name + assert path_artifact == Path("/path/to/artifacts/non_default_artifact.pkl") mock_logger.exception.assert_called_once_with(f"Model artifact not found at {path_artifact}") def test_evaluate_sweep(stepshifter_manager): @@ -292,12 +353,12 @@ def test_evaluate_sweep(stepshifter_manager): with patch("views_stepshifter.manager.stepshifter_manager.read_dataframe") as mock_read_dataframe, \ patch.object(StepshifterManager, "_get_standardized_df", return_value="standardized_df") as mock_get_standardized_df: - # mock_read_dataframe.return_value = pd.DataFrame({"a": [1, 2, 3]}) - stepshifter_manager.config = stepshifter_manager._update_single_config(MagicMock(run_type="test_run_type")) + args = ForecastingModelArgs(run_type="test_run_type", evaluate=True, saved=True) + stepshifter_manager.configs = vars(args) + eval_type = "test_eval_type" stepshifter_manager._evaluate_sweep(eval_type, mock_model) assert stepshifter_manager.config["run_type"] == "test_run_type" - # mock_read_dataframe.assert_called_once() mock_model.predict.assert_called_once_with("test_run_type", eval_type) - mock_get_standardized_df.assert_called_once() + mock_get_standardized_df.assert_called_once() \ No newline at end of file From c82472e2f22f492b301c222714d6c35a7a7671db Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Mon, 17 Nov 2025 15:10:04 +0100 Subject: [PATCH 08/14] update toml file --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d5006e3..36a8b33 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ views_pipeline_core = ">=2.0.0,<3.0.0" scikit-learn = "^1.6.0" pandas = "^1.5.3" numpy = "^1.25.2" -darts = "^0.30.0" +darts = "^0.38.0" lightgbm = "4.6.0" views_forecasts = "^0.5.5" scipy = "1.15.1" # error with latest scipy 1.16.0. see https://github.com/statsmodels/statsmodels/issues?q=_lazywhere From 5fcfe4330701c8f581969ba847e49a17277828d5 Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Thu, 20 Nov 2025 12:31:51 +0100 Subject: [PATCH 09/14] log and unlog inside stepshifter --- views_stepshifter/models/stepshifter.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/views_stepshifter/models/stepshifter.py b/views_stepshifter/models/stepshifter.py index 43763e9..99056bb 100644 --- a/views_stepshifter/models/stepshifter.py +++ b/views_stepshifter/models/stepshifter.py @@ -112,6 +112,7 @@ def _process_data(self, df: pd.DataFrame): missing_df = pd.DataFrame(0, index=missing_combinations, columns=df.columns) df = pd.concat([df, missing_df]).sort_index() + df[self._targets] = np.log1p(df[self._targets]) # Calculates log(1 + x). return df def _prepare_time_series(self, df: pd.DataFrame): @@ -176,6 +177,8 @@ def _predict_by_step(self, model, step: int, sequence_number: int): columns=[f"pred_{self._targets}"], ) + df_preds[f"pred_{self._targets}"] = np.expm1(df_preds[f"pred_{self._targets}"]) # Calculates exp(x) - 1 + return df_preds.sort_index() def _predict_by_sequence(self, sequence_number): From 428d5484358c6c37875d022925fc3dc8f96b71cc Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Mon, 24 Nov 2025 15:32:18 +0100 Subject: [PATCH 10/14] update test --- tests/test_stepshifter_manager.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/tests/test_stepshifter_manager.py b/tests/test_stepshifter_manager.py index bcc9e67..3bbd8f1 100644 --- a/tests/test_stepshifter_manager.py +++ b/tests/test_stepshifter_manager.py @@ -16,7 +16,7 @@ def mock_model_path(): mock_path.model_dir = "/path/to/models/test_model" mock_path.target = "model" mock_path.artifacts = Path("/path/to/artifacts") - mock_path.get_latest_model_artifact_path.return_value = Path("predictions_test_run_202401011200000") + mock_path.get_latest_model_artifact_path.return_value = Path("test_model_202401011_200000") mock_path.logging = MagicMock() mock_path.models = Path("/path/to/models_root") mock_path.model_name = "test_model" @@ -208,7 +208,7 @@ def test_get_model(stepshifter_manager, stepshifter_manager_hurdle, mock_partiti stepshifter_manager_hurdle.configs = hurdle_args stepshifter_manager_hurdle._get_model(mock_partitioner_dict) - mock_hurdle_model.assert_called_once_with(stepshifter_manager_hurdle.config, mock_partitioner_dict) + mock_hurdle_model.assert_called_once_with(stepshifter_manager_hurdle.configs, mock_partitioner_dict) mock_stepshifter_model.assert_not_called() mock_hurdle_model.reset_mock() @@ -221,7 +221,7 @@ def test_get_model(stepshifter_manager, stepshifter_manager_hurdle, mock_partiti stepshifter_manager.configs = non_hurdle_args stepshifter_manager._get_model(mock_partitioner_dict) - mock_stepshifter_model.assert_called_once_with(stepshifter_manager.config, mock_partitioner_dict) + mock_stepshifter_model.assert_called_once_with(stepshifter_manager.configs, mock_partitioner_dict) mock_hurdle_model.assert_not_called() def test_train_model_artifact(stepshifter_manager, stepshifter_manager_hurdle): @@ -242,7 +242,7 @@ def test_train_model_artifact(stepshifter_manager, stepshifter_manager_hurdle): stepshifter_manager._train_model_artifact() mock_split_hurdle.assert_not_called() - assert stepshifter_manager.config["run_type"] == "test_run_type" + assert stepshifter_manager.configs["run_type"] == "test_run_type" mock_read_dataframe.assert_called_once() mock_get_model.assert_called_once_with(stepshifter_manager._data_loader.partition_dict) mock_get_model.return_value.fit.assert_called_once() @@ -285,9 +285,8 @@ def test_evaluate_model_artifact(stepshifter_manager): artifact_name = None stepshifter_manager._evaluate_model_artifact(eval_type, artifact_name) - assert stepshifter_manager.config["run_type"] == "test_run_type" + assert stepshifter_manager.configs["run_type"] == "test_run_type" mock_logger.info.assert_called_once_with(f"Using latest (default) run type (test_run_type) specific artifact") - assert stepshifter_manager.config["timestamp"] == "202401011200000" mock_get_standardized_df.assert_called_once() mock_logger.reset_mock() @@ -321,7 +320,7 @@ def test_forecast_model_artifact(stepshifter_manager): artifact_name = None stepshifter_manager._forecast_model_artifact(artifact_name) - assert stepshifter_manager.config["run_type"] == "forecasting" + assert stepshifter_manager.configs["run_type"] == "forecasting" mock_logger.info.assert_called_once_with(f"Using latest (default) run type (forecasting) specific artifact") mock_model.predict.assert_called_once_with("forecasting") mock_get_standardized_df.assert_called_once() @@ -359,6 +358,6 @@ def test_evaluate_sweep(stepshifter_manager): eval_type = "test_eval_type" stepshifter_manager._evaluate_sweep(eval_type, mock_model) - assert stepshifter_manager.config["run_type"] == "test_run_type" + assert stepshifter_manager.configs["run_type"] == "test_run_type" mock_model.predict.assert_called_once_with("test_run_type", eval_type) mock_get_standardized_df.assert_called_once() \ No newline at end of file From 15dc5a536fc3ccd0fa7b73908b587332be7dc541 Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Mon, 24 Nov 2025 15:32:27 +0100 Subject: [PATCH 11/14] small fix --- pyproject.toml | 1 + views_stepshifter/manager/stepshifter_manager.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 36a8b33..60d1e6a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ numpy = "^1.25.2" darts = "^0.38.0" lightgbm = "4.6.0" views_forecasts = "^0.5.5" +xgboost = "^3.0.0" scipy = "1.15.1" # error with latest scipy 1.16.0. see https://github.com/statsmodels/statsmodels/issues?q=_lazywhere diff --git a/views_stepshifter/manager/stepshifter_manager.py b/views_stepshifter/manager/stepshifter_manager.py index dcdf16f..2c4048c 100644 --- a/views_stepshifter/manager/stepshifter_manager.py +++ b/views_stepshifter/manager/stepshifter_manager.py @@ -153,7 +153,7 @@ def _evaluate_model_artifact( ) path_artifact = self._model_path.get_latest_model_artifact_path(run_type) - self.configs = {"timestamp": path_artifact.stem[-15:]} + self.configs['timestamp'] = path_artifact.stem[-15:] try: with open(path_artifact, "rb") as f: @@ -196,7 +196,7 @@ def _forecast_model_artifact(self, artifact_name: str) -> pd.DataFrame: ) path_artifact = self._model_path.get_latest_model_artifact_path(run_type) - self.configs = {"timestamp": path_artifact.stem[-15:]} + self.configs['timestamp'] = path_artifact.stem[-15:] try: with open(path_artifact, "rb") as f: From 46fed0dd9a7a2f8c91048dfa95d979ca6909d246 Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Wed, 26 Nov 2025 12:54:29 +0100 Subject: [PATCH 12/14] emergency fix --- views_stepshifter/manager/stepshifter_manager.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/views_stepshifter/manager/stepshifter_manager.py b/views_stepshifter/manager/stepshifter_manager.py index 2c4048c..c0f1679 100644 --- a/views_stepshifter/manager/stepshifter_manager.py +++ b/views_stepshifter/manager/stepshifter_manager.py @@ -153,7 +153,7 @@ def _evaluate_model_artifact( ) path_artifact = self._model_path.get_latest_model_artifact_path(run_type) - self.configs['timestamp'] = path_artifact.stem[-15:] + self.configs = {"timestamp": path_artifact.stem[-15:]} try: with open(path_artifact, "rb") as f: @@ -191,12 +191,12 @@ def _forecast_model_artifact(self, artifact_name: str) -> pd.DataFrame: path_artifact = path_artifacts / artifact_name else: # use the latest model artifact based on the run type + path_artifact = self._model_path.get_latest_model_artifact_path(run_type) logger.info( - f"Using latest (default) run type ({run_type}) specific artifact" + f"Using latest (default) run type ({run_type}) specific artifact {path_artifact.name}" ) - path_artifact = self._model_path.get_latest_model_artifact_path(run_type) - - self.configs['timestamp'] = path_artifact.stem[-15:] + + self.configs = {"timestamp": path_artifact.stem[-15:]} try: with open(path_artifact, "rb") as f: From f3054f30fd25c56d5b9b009ba3c796decc37393e Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Thu, 11 Dec 2025 14:23:55 +0100 Subject: [PATCH 13/14] improved logging info --- tests/test_stepshifter_manager.py | 2 -- views_stepshifter/manager/stepshifter_manager.py | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_stepshifter_manager.py b/tests/test_stepshifter_manager.py index 3bbd8f1..1b78f76 100644 --- a/tests/test_stepshifter_manager.py +++ b/tests/test_stepshifter_manager.py @@ -286,7 +286,6 @@ def test_evaluate_model_artifact(stepshifter_manager): stepshifter_manager._evaluate_model_artifact(eval_type, artifact_name) assert stepshifter_manager.configs["run_type"] == "test_run_type" - mock_logger.info.assert_called_once_with(f"Using latest (default) run type (test_run_type) specific artifact") mock_get_standardized_df.assert_called_once() mock_logger.reset_mock() @@ -321,7 +320,6 @@ def test_forecast_model_artifact(stepshifter_manager): stepshifter_manager._forecast_model_artifact(artifact_name) assert stepshifter_manager.configs["run_type"] == "forecasting" - mock_logger.info.assert_called_once_with(f"Using latest (default) run type (forecasting) specific artifact") mock_model.predict.assert_called_once_with("forecasting") mock_get_standardized_df.assert_called_once() diff --git a/views_stepshifter/manager/stepshifter_manager.py b/views_stepshifter/manager/stepshifter_manager.py index c0f1679..4f71bee 100644 --- a/views_stepshifter/manager/stepshifter_manager.py +++ b/views_stepshifter/manager/stepshifter_manager.py @@ -148,10 +148,10 @@ def _evaluate_model_artifact( path_artifact = path_artifacts / artifact_name else: # use the latest model artifact based on the run type + path_artifact = self._model_path.get_latest_model_artifact_path(run_type) logger.info( - f"Using latest (default) run type ({run_type}) specific artifact" + f"Using latest (default) run type ({run_type}) specific artifact {path_artifact.name}" ) - path_artifact = self._model_path.get_latest_model_artifact_path(run_type) self.configs = {"timestamp": path_artifact.stem[-15:]} From 1f800af8c757e65932ab4e0669974c4566d6dc9f Mon Sep 17 00:00:00 2001 From: xiaolongsun <95378566+xiaolong0728@users.noreply.github.com> Date: Thu, 11 Dec 2025 14:32:28 +0100 Subject: [PATCH 14/14] update toml file --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 60d1e6a..2642e8a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "views_stepshifter" -version = "1.0.2" +version = "1.1.0" description = "" authors = [ "Xiaolong Sun ",