diff --git a/common_utils/hurdle_model.py b/common_utils/hurdle_model.py index 3e14f19f..03785e68 100644 --- a/common_utils/hurdle_model.py +++ b/common_utils/hurdle_model.py @@ -7,12 +7,9 @@ from sklearn.utils.estimator_checks import check_estimator from sklearn.utils.validation import check_X_y, check_array, check_is_fitted from sklearn.ensemble import GradientBoostingClassifier, GradientBoostingRegressor -from sklearn.ensemble import RandomForestRegressor -from sklearn.ensemble import RandomForestClassifier -from sklearn.ensemble import HistGradientBoostingRegressor -from sklearn.ensemble import HistGradientBoostingClassifier -from xgboost import XGBRegressor -from xgboost import XGBClassifier +from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier +from sklearn.ensemble import HistGradientBoostingRegressor, HistGradientBoostingClassifier +from xgboost import XGBRegressor, XGBClassifier from xgboost import XGBRFRegressor, XGBRFClassifier from lightgbm import LGBMClassifier, LGBMRegressor @@ -105,5 +102,4 @@ def predict(self, X: Union[np.ndarray, pd.DataFrame]): return self.clf_.predict_proba(X)[:, 1] * self.reg_.predict(X) - - + \ No newline at end of file diff --git a/common_utils/utils_cli_parser.py b/common_utils/utils_cli_parser.py index cbc8f261..8775b783 100644 --- a/common_utils/utils_cli_parser.py +++ b/common_utils/utils_cli_parser.py @@ -1,3 +1,4 @@ + import sys import argparse @@ -32,6 +33,13 @@ def parse_args(): 'Note: If --sweep is specified, --evaluate will also automatically be flagged. ' 'Cannot be used with --run_type forecasting.') + parser.add_argument('-f', '--forecast', + action='store_true', + help='Flag to indicate if the model should produce predictions. ' + 'Note: If --sweep is specified, --forecast will also automatically be flagged. ' + 'Can only be used with --run_type forecasting.') + + parser.add_argument('-a', '--artifact_name', type=str, help='Specify the name of the model artifact to be used for evaluation. ' @@ -43,27 +51,32 @@ def parse_args(): return parser.parse_args() def validate_arguments(args): - if args.sweep: - if args.run_type != 'calibration': - print("Error: Sweep runs must have --run_type set to 'calibration'. Exiting.") - print("To fix: Use --run_type calibration when --sweep is flagged.") - sys.exit(1) - - if args.run_type in ['testing', 'forecasting'] and args.sweep: - print("Error: Sweep cannot be performed with testing or forecasting run types. Exiting.") - print("To fix: Remove --sweep flag or set --run_type to 'calibration'.") + if args.sweep and args.run_type != 'calibration': + print("Error: Sweep runs must have --run_type set to 'calibration'. Exiting.") + print("To fix: Use --run_type calibration when --sweep is flagged.") sys.exit(1) - if args.run_type == 'forecasting' and args.evaluate: + if args.evaluate and args.run_type == 'forecasting': print("Error: Forecasting runs cannot evaluate. Exiting.") print("To fix: Remove --evaluate flag when --run_type is 'forecasting'.") sys.exit(1) - if args.run_type in ['calibration', 'testing'] and not args.train and not args.evaluate and not args.sweep: + if (args.run_type in ['calibration', 'testing', 'forecasting'] + and not args.train and not args.evaluate and not args.forecast and not args.sweep): print(f"Error: Run type is {args.run_type} but neither --train, --evaluate, nor --sweep flag is set. Nothing to do... Exiting.") print("To fix: Add --train and/or --evaluate flag. Or use --sweep to run both training and evaluation in a WadnB sweep loop.") sys.exit(1) + if args.train and args.artifact_name: + print("Error: Both --train and --artifact_name flags are set. Exiting.") + print("To fix: Remove --artifact_name if --train is set, or vice versa.") + sys.exit(1) + + if args.forecast and args.run_type != 'forecasting': + print("Error: --forecast flag can only be used with --run_type forecasting. Exiting.") + print("To fix: Set --run_type to forecasting if --forecast is flagged.") + sys.exit(1) + # notes on stepshifted models: # There will be some thinking here in regards to how we store, denote (naming convention), and retrieve the model artifacts from stepshifted models. @@ -72,5 +85,3 @@ def validate_arguments(args): # And the rest of the code maded in a way to handle this naming convention without any issues. Could be a simple fix. # Alternatively, we could store the model artifacts in a subfolder for each stepshifted model. This would make it easier to handle the artifacts, but it would also make it harder to retrieve the latest artifact for a given run type. # Lastly, the solution Xiaolong is working on might allow us the store multiple models (steps) in one artifact, which would make this whole discussion obsolete and be the best solution. - - diff --git a/common_utils/utils_evaluation_metrics.py b/common_utils/utils_evaluation_metrics.py index 579b7d0d..46c9be84 100644 --- a/common_utils/utils_evaluation_metrics.py +++ b/common_utils/utils_evaluation_metrics.py @@ -1,6 +1,16 @@ + from dataclasses import dataclass from typing import Optional import pandas as pd +from statistics import mean, stdev, median + +import properscoring as ps +from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_squared_log_error, brier_score_loss, average_precision_score, roc_auc_score +from views_forecasts.extensions import * + + + +# MUST BE ALIGNED WITH THE METRICS WE DECIDE TO USE IN THE WORKSHOP!!!! @dataclass class EvaluationMetrics: @@ -75,4 +85,67 @@ def evaluation_dict_to_dataframe(evaluation_dict: dict) -> pd.DataFrame: """ return pd.DataFrame.from_dict(evaluation_dict, orient='index') -# TBD: Align with metrics discussed in workshop + @staticmethod + def calculate_aggregate_metrics(evaluation_dict: dict) -> dict: + metrics_aggregate = { + 'mean': {}, + 'std': {}, + 'median': {} + } + + for metric in EvaluationMetrics.__annotations__.keys(): + metric_values = [getattr(evaluation, metric) for evaluation in evaluation_dict.values() if getattr(evaluation, metric) is not None] + if metric_values: + metrics_aggregate['mean'][metric] = mean(metric_values) + metrics_aggregate['std'][metric] = stdev(metric_values) + metrics_aggregate['median'][metric] = median(metric_values) + else: + metrics_aggregate['mean'][metric] = None + metrics_aggregate['std'][metric] = None + metrics_aggregate['median'][metric] = None + + return metrics_aggregate + + @staticmethod + def output_metrics(evaluation_dict): + aggregate = EvaluationMetrics.calculate_aggregate_metrics(evaluation_dict) + step_metrics_dict = {step: vars(metrics) for step, metrics in evaluation_dict.items()} + step_metrics_dict['mean'] = aggregate['mean'] + step_metrics_dict['std'] = aggregate['std'] + step_metrics_dict['median'] = aggregate['median'] + return step_metrics_dict + + +def generate_metric_dict(df, config): + """ + Generates a dictionary of evaluation metrics for a given forecasting configuration and dataset. + + Args: + df (pd.DataFrame): A pandas DataFrame containing the forecasted values and ground truth. + config (dict): A dictionary containing the forecasting configuration parameters. + + Returns: + evaluation_dict (dict): A dictionary of EvaluationMetrics instances for each forecasting step. + df_evaluation_dict (pd.DataFrame): A pandas DataFrame containing the evaluation metrics for each forecasting step. + + Note: + ! This function is temporary for the stepshifter model. + ! Change the metrics to those discussed previously. + For logged targets, calculating MSE is actually MSLE. + KLD and Jeffreys divergence are measures used to quantify the difference between two probability distributions. Why do we calculate these metrics in the context of forecasting? + Brier score is used for binary and categorical outcomes that can be structured as true or false + There are no classes in data, so we cannot calculate roc_auc_score, ap_score + """ + + evaluation_dict = EvaluationMetrics.make_evaluation_dict(steps=config.steps[-1]) + for step in config.steps: + evaluation_dict[f"step{str(step).zfill(2)}"].MSE = mean_squared_error(df[config.depvar], df[f"step_pred_{step}"]) + evaluation_dict[f"step{str(step).zfill(2)}"].MAE = mean_absolute_error(df[config.depvar], df[f"step_pred_{step}"]) + # evaluation_dict[f"step{str(step).zfill(2)}"].MSLE = mean_squared_log_error(df[config.depvar], df[f"step_pred_{step}"]) + evaluation_dict[f"step{str(step).zfill(2)}"].CRPS = ps.crps_ensemble(df[config.depvar], df[f"step_pred_{step}"]).mean() + # evaluation_dict[f"step{str(step).zfill(2)}"].Brier = brier_score_loss(df[config.depvar], df[f"step_pred_{step}"]) + # evaluation_dict[f"step{str(step).zfill(2)}"].AUC = roc_auc_score(df[config.depvar], df[f"step_pred_{step}"]) + # evaluation_dict[f"step{str(step).zfill(2)}"].AP = average_precision_score(df[config.depvar], df[f"step_pred_{step}"]) + evaluation_dict = EvaluationMetrics.output_metrics(evaluation_dict) + df_evaluation_dict = EvaluationMetrics.evaluation_dict_to_dataframe(evaluation_dict) + return evaluation_dict, df_evaluation_dict diff --git a/common_utils/utils_input_data.py b/common_utils/utils_input_data.py new file mode 100644 index 00000000..96737efc --- /dev/null +++ b/common_utils/utils_input_data.py @@ -0,0 +1,17 @@ +import numpy as np + +def ensure_float64(df): + """ + Check if the DataFrame only contains np.float64 types. If not, raise a warning + and convert the DataFrame to use np.float64 for all its numeric columns. + """ + + non_float64_cols = df.select_dtypes(include=['number']).columns[df.select_dtypes(include=['number']).dtypes != np.float64] + + if len(non_float64_cols) > 0: + print(f"Warning: DataFrame contains non-np.float64 numeric columns. Converting the following columns: {', '.join(non_float64_cols)}") + + for col in non_float64_cols: + df[col] = df[col].astype(np.float64) + + return df \ No newline at end of file diff --git a/common_utils/utils_model_outputs.py b/common_utils/utils_model_outputs.py index dc79a454..4b142bbe 100644 --- a/common_utils/utils_model_outputs.py +++ b/common_utils/utils_model_outputs.py @@ -2,6 +2,11 @@ from typing import List, Optional import pandas as pd + +# we need to figure out if we are storing logged fatalities or not +# And this is also a good place to decide on the uncertainty quantification. Right now var, but maybe HDI or something else. +# you migth also waht the a non-step specifci list of pgm? So you can rectrate the full df from here? Otherwsie this could turn into a mess + @dataclass class ModelOutputs: """ @@ -17,7 +22,7 @@ class ModelOutputs: pg_id (Optional[List[int]]): The priogrid id. c_id (Optional[List[int]]): The country id. month_id (Optional[List[int]]): The month id. - step (Optional[List[int]]): The step ahead forecast. + out_sample_month (Optional[List[int]]): The step ahead forecast. """ y_score: Optional[List[float]] = field(default_factory=list) @@ -29,7 +34,7 @@ class ModelOutputs: pg_id: Optional[List[int]] = field(default_factory=list) c_id: Optional[List[int]] = field(default_factory=list) month_id: Optional[List[int]] = field(default_factory=list) - step: Optional[List[int]] = field(default_factory=list) + out_sample_month: Optional[List[int]] = field(default_factory=list) @classmethod def make_output_dict(cls, steps=36) -> dict: @@ -103,6 +108,36 @@ def output_dict_to_dataframe(dict_of_outputs) -> pd.DataFrame: return df -# we need to figure out if we are storing logged fatalities or not -# And this is also a good place to decide on the uncertainty quantification. Right now var, but maybe HDI or something else. -# you might also want the a non-step specific list of pgm? So you can recreate the full df from here? Otherwise this could turn into a mess + +def generate_output_dict(df, config): + """ + Generate a dictionary of ModelOutputs instances and a DataFrame from a DataFrame of model predictions. + + This function takes a DataFrame of model predictions and a configuration object, and generates a dictionary of ModelOutputs instances + + Args: + df (pd.DataFrame): A DataFrame containing model predictions. + config (dict): A configuration object containing model settings. + + Returns: + output_dict (dict): A dictionary where each key is a step label and each value is an instance of ModelOutputs. + df_output_dict (pd.DataFrame): A DataFrame of model outputs. + + Note: + ! This is temporary for stepshifter model + """ + output_dict = ModelOutputs.make_output_dict(steps=config.steps[-1]) + for step in config.steps: + df_step = df[[config.depvar, f"step_pred_{step}"]] + output_dict[f"step{str(step).zfill(2)}"].y_true = df_step[config.depvar].to_list() + output_dict[f"step{str(step).zfill(2)}"].y_score = df_step[f"step_pred_{step}"].to_list() + output_dict[f"step{str(step).zfill(2)}"].month_id = df_step.index.get_level_values("month_id").to_list() + if df.index.names[1] == "priogrid_gid": + output_dict[f"step{str(step).zfill(2)}"].pg_id = df_step.index.get_level_values("priogrid_gid").to_list() + elif df.index.names[1] == "country_id": + output_dict[f"step{str(step).zfill(2)}"].c_id = df_step.index.get_level_values("country_id").to_list() + output_dict[f"step{str(step).zfill(2)}"].out_sample_month = step + df_output_dict = ModelOutputs.output_dict_to_dataframe(output_dict) + df_output_dict = df_output_dict.reset_index() + df_output_dict = df_output_dict.drop(columns=df_output_dict.columns[0]) + return output_dict, df_output_dict \ No newline at end of file diff --git a/common_utils/views_stepshift/run.py b/common_utils/views_stepshift/run.py index f27dcca2..094d1692 100644 --- a/common_utils/views_stepshift/run.py +++ b/common_utils/views_stepshift/run.py @@ -132,12 +132,12 @@ def future_point_predict(self, time: int, data: pd.DataFrame, keep_specific: boo if proba: predictions = self._models.predict_proba( data.loc[time - self._models._steps_extent: time], - combine=True + combine=False ) else: predictions = self._models.predict( data.loc[time - self._models._steps_extent: time], - combine = True + combine =False ) if not keep_specific: diff --git a/common_utils/views_stepshifter_darts/__init__.py b/common_utils/views_stepshifter_darts/__init__.py new file mode 100644 index 00000000..e8550abf --- /dev/null +++ b/common_utils/views_stepshifter_darts/__init__.py @@ -0,0 +1,2 @@ +from .stepshifter_darts import StepshifterModel +from darts.models import LightGBMModel, XGBModel, RandomForest \ No newline at end of file diff --git a/common_utils/views_stepshifter_darts/stepshifter_darts.py b/common_utils/views_stepshifter_darts/stepshifter_darts.py new file mode 100644 index 00000000..a1b5f504 --- /dev/null +++ b/common_utils/views_stepshifter_darts/stepshifter_darts.py @@ -0,0 +1,173 @@ +import pickle +import numpy as np +import pandas as pd +from darts import TimeSeries +from darts.models import LightGBMModel, XGBModel +from darts.models.forecasting.forecasting_model import ModelMeta +import warnings +warnings.filterwarnings("ignore") +import time +from typing import List, Dict + +from views_forecasts.extensions import * +from .validation import views_validate +from utils import get_parameters + + +class StepshifterModel: + def __init__(self, config: Dict, partitioner_dict: Dict[str, List[int]]): + self._initialize_model(config) + + self.steps = config['steps'] + self.target = config['depvar'] + + self._params = get_parameters(config) + self._steps_extent = max(self.steps) + self._train_start, self._train_end = partitioner_dict['train'] + self._test_start, self._test_end = partitioner_dict['predict'] + + self._models = {} + self._independent_variables = None + self._time = None + self._level = None + self._series = None + + + @views_validate + def fit(self, df: pd.DataFrame): + self._setup(df) + self._prepare_time_series(df) + self._fit_models() + + + @views_validate + def predict(self, df: pd.DataFrame) -> pd.DataFrame: + pred = self._predict_models() + pred[self.target] = df.loc[(slice(self._test_start, self._test_end),),:][self.target] + return pred + + + def _initialize_model(self, config: Dict): + self.clf = globals()[config['algorithm']] + if not isinstance(self.clf, ModelMeta): + raise ValueError(f"Model {config['algorithm']} is not a valid Darts forecasting model. Change the model in the config file.") + + + def _setup(self, df: pd.DataFrame): + self._time = df.index.names[0] + self._level = df.index.names[1] + self._independent_variables = [c for c in df.columns if c != self.target] + + + def _prepare_time_series(self, df: pd.DataFrame): + df_reset = df.reset_index(level=[1]) + self._series = TimeSeries.from_group_dataframe(df_reset, group_cols=self._level, + value_cols=self._independent_variables + [self.target]) + + + def _fit_models(self): + target = [series.slice(self._train_start, self._train_end + 1)[self.target] + for series in self._series] # ts.slice is different from df.slice + past_cov = [series.slice(self._train_start, self._train_end + 1)[self._independent_variables] + for series in self._series] + for step in self.steps: + model = self.clf(lags_past_covariates=[-step], **self._params) + model.fit(target, past_covariates=past_cov) + self._models[step] = model + + + def _predict_models(self): + target = [series.slice(self._train_start, self._train_end + 1)[self.target] + for series in self._series] + + preds_by_step = [self._predict_for_step(step, target) for step in self.steps] + return pd.concat(preds_by_step, axis=1) + + + def _predict_for_step(self, step, target): + model = self._models[step] + horizon = self._test_end - self._test_start + 1 + ts_pred = model.predict(n=horizon, + series=target, + # darts automatically locates the time period of past_covariates + past_covariates=[series[self._independent_variables] for series in self._series], + show_warnings=False) + return self._process_predictions(ts_pred, step) + + + def _process_predictions(self, ts_pred, step): + preds = [] + for pred in ts_pred: + df_pred = pred.pd_dataframe() + df_pred.index = pd.MultiIndex.from_product([df_pred.index, [pred.static_covariates.iloc[0, 0]]]) + df_pred.index.names = [self._time, self._level] + df_pred.columns = [f"step_pred_{step}"] + df_pred = df_pred.loc[slice(self._test_start, self._test_end, ), :] + preds.append(df_pred) + return pd.concat(preds).sort_index() + + + def save(self, path: str): + try: + with open(path, "wb") as file: + pickle.dump(self, file) + print(f"Model successfully saved to {path}") + except Exception as e: + print(f"Failed to save model: {e}") + + + @property + def models(self): + return self._models.values() + +''' +if __name__ == "__main__": + month = [*range(1, 600)] + pg = [123, 456] + idx = pd.MultiIndex.from_product([month, pg], names=['month_id', 'priogrid_gid']) + df = pd.DataFrame(index=idx) + df['ged_sb_dep'] = df.index.get_level_values(0).astype(float) + df['ln_ged_sb'] = df.index.get_level_values(0) + df.index.get_level_values(1) / 1000 + df['ln_pop_gpw_sum'] = df.index.get_level_values(0) * 10 + df.index.get_level_values(1) / 1000 + steps = [*range(1, 3 + 1, 1)] + partitioner_dict = {"train": (121, 131), "predict": (132, 135)} + target = 'ged_sb_dep' + + # df = pd.read_parquet('raw.parquet') + # steps = [*range(1, 36 + 1, 1)] + # partitioner_dict = {"train": (121, 444), "predict": (445, 492)} + # target = df.forecasts.target + # + start_t = time.time() + + hp_config = { + "name": "orange_pasta", + "algorithm": "LightGBMModel", + "depvar": "ged_sb_dep", + "steps": [*range(1, 36 + 1, 1)], + "parameters": { + "learning_rate": 0.01, + "n_estimators": 100, + "num_leaves": 31, + } + } + + stepshifter = StepshifterModel(hp_config, partitioner_dict) + stepshifter.fit(df) + stepshifter.save('./model.pkl') + + train_t = time.time() + minutes = (train_t - start_t) / 60 + print(f'Done training. Runtime: {minutes:.3f} minutes') + + # stepshift = pd.read_pickle('./model.pkl') + pred = stepshifter.predict() + pred.to_parquet('pred.parquet') + + end_t = time.time() + minutes = (end_t - train_t) / 60 + print(f'Done predicting. Runtime: {minutes:.3f} minutes') +''' + + + diff --git a/common_utils/views_stepshifter_darts/validation.py b/common_utils/views_stepshifter_darts/validation.py new file mode 100644 index 00000000..8426ecfa --- /dev/null +++ b/common_utils/views_stepshifter_darts/validation.py @@ -0,0 +1,24 @@ +import functools +import numpy as np +import pandas as pd + +class ValidationError(Exception): + pass + +def dataframe_is_right_format(dataframe: pd.DataFrame): + try: + assert len(dataframe.index.levels) == 2 + except AssertionError: + raise ValidationError("Dataframe must have a two-level index") + + try: + assert set(dataframe.dtypes) == {np.dtype(float)} + except AssertionError: + raise ValidationError("The dataframe must contain only np.float64 floats") + +def views_validate(fn): + @functools.wraps(fn) + def inner(*args,**kwargs): + dataframe_is_right_format(args[-1]) + return fn(*args,**kwargs) + return inner diff --git a/documentation/model.md b/documentation/model.md index 5c9a4315..37d8ecfb 100644 --- a/documentation/model.md +++ b/documentation/model.md @@ -4,7 +4,7 @@ In the context of the VIEWS pipeline, a model should be understood as: 1) A specific instantiation of a machine learning algorithm, -2) Trained using a predetermined and unique set of hyperpara.meters, +2) Trained using a predetermined and unique set of hyperparameters, 3) On a well-defined set of input features, 4) And targeting a specific outcome target. 5) In the case of stepshift models, a model is understood as **all** code and **all** artifacts necessary to generate a comprehensive 36 month forecast for the specified target. diff --git a/models/README.md b/models/README.md index b727d8c9..90b18d4c 100644 --- a/models/README.md +++ b/models/README.md @@ -1,5 +1,8 @@ +# Model Overview + | model | algorithm | queryset | | -------------------------------------------------- | ------ | ------ | +| [black_lodge](https://github.com/prio-data/views_pipeline/tree/main/models/black_lodge) | XGBRFRegressor | fatalities002_baseline | | [blank_space](https://github.com/prio-data/views_pipeline/tree/main/models/blank_space) | HurdleRegression | fatalities003_pgm_natsoc | | [electric_relaxation](https://github.com/prio-data/views_pipeline/tree/main/models/electric_relaxation) | RandomForestClassifier | escwa001_cflong | | [lavender_haze](https://github.com/prio-data/views_pipeline/tree/main/models/lavender_haze) | HurdleRegression | fatalities003_pgm_broad | @@ -7,3 +10,13 @@ | [purple_alien](https://github.com/prio-data/views_pipeline/tree/main/models/purple_alien) | HydraNet | simon_tests | | [wildest_dream](https://github.com/prio-data/views_pipeline/tree/main/models/wildest_dream) | HurdleRegression | fatalities003_pgm_conflict_sptime_dist | | [yellow_pikachu](https://github.com/prio-data/views_pipeline/tree/main/models/yellow_pikachu) | XGBRegressor | fatalities003_pgm_conflict_treelag | + +# Model Definition +In the context of the VIEWS pipeline, a model should be understood as: + +1) A specific instantiation of a machine learning algorithm, +2) Trained using a predetermined and unique set of hyperparameters, +3) On a well-defined set of input features, +4) And targeting a specific outcome target. +5) In the case of stepshift models, a model is understood as **all** code and **all** artifacts necessary to generate a comprehensive 36 month forecast for the specified target. +6) Note that, two models, identical in all other aspects, will be deemed distinct if varying post-processing techniques are applied to their generated predictions. For instance, if one model's predictions undergo calibration or normalization while the other's do not. \ No newline at end of file diff --git a/models/black_lodge/README.md b/models/black_lodge/README.md new file mode 100644 index 00000000..c639675e --- /dev/null +++ b/models/black_lodge/README.md @@ -0,0 +1,18 @@ +# black_lodge +## Overview +This folder contains code for black_lodge model, a baseline random forest for predicting fatalities, also known as fatalities002_baseline_rf in the [documentation paper of fatalities002](https://viewsforecasting.org/wp-content/uploads/VIEWS_documentation_models_Fatalities002.pdf). + +The model input data is the qs_baseline, and its algorithm is a random forest regressor with an XGBoost implementation (XGBRFRegressor). + +This is very simple model with only five data columns (each column representing one feature): The number of fatalities in the same country at $t-1$, three decay functions of time since there was at least five fatalities in a single month, for each of the UCDP conflict types -- state-based, one-sided, or non-state conflict -- and log population size (Hegre2020RP,Pettersson2021JPR).The features in the baseline are included in all the models described below. This ensures that all models in the ensemble provides at least moderately good predictions, while guaranteeing diversity in feature sets and modelling approaches. + +## To-Dos +- [x] Take over model configs from [viewsforecasting](https://github.com/prio-data/viewsforecasting/blob/4dbc2cd2b6edb3169fc585f7dbb868b65fab0e2c/SystemUpdates/ModelDefinitions.py#L36) +- [x] Tidy config files +- [x] Dataloader: Rewrite queryset for vimur +- [x] Training script +- [ ] Forecasting script +- [ ] Evaluation script +- [ ] Test management script +- [ ] Test main.py +- [ ] Log on wandb \ No newline at end of file diff --git a/models/black_lodge/artifacts/.gitkeep b/models/black_lodge/artifacts/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/models/black_lodge/configs/config_deployment.py b/models/black_lodge/configs/config_deployment.py new file mode 100644 index 00000000..44bd8457 --- /dev/null +++ b/models/black_lodge/configs/config_deployment.py @@ -0,0 +1,15 @@ +def get_deployment_config(): + + """ + Contains the configuration for deploying the model into different environments. + This configuration is "behavioral" so modifying it will affect the model's runtime behavior and integration into the deployment system. + + Returns: + - deployment_config (dict): A dictionary containing deployment settings, determining how the model is deployed, including status, endpoints, and resource allocation. + """ + + deployment_config = { + "deployment_status": "baseline", # shadow, deployed, baseline, or deprecated + } + + return deployment_config \ No newline at end of file diff --git a/models/black_lodge/configs/config_hyperparameters.py b/models/black_lodge/configs/config_hyperparameters.py new file mode 100644 index 00000000..fa330ccc --- /dev/null +++ b/models/black_lodge/configs/config_hyperparameters.py @@ -0,0 +1,9 @@ +def get_hp_config(): + hp_config = { + "steps": [*range(1, 36 + 1, 1)], + "parameters": { + "n_estimators": 300, + "n_jobs": 12 + } + } + return hp_config \ No newline at end of file diff --git a/models/black_lodge/configs/config_input_data.py b/models/black_lodge/configs/config_input_data.py new file mode 100644 index 00000000..bb8f1714 --- /dev/null +++ b/models/black_lodge/configs/config_input_data.py @@ -0,0 +1,73 @@ +import numpy as np +from viewser import Queryset, Column + +def get_input_data(): + """" + Contains viewser queryset to fetch input data (queryset name, target variable, level of analysis, transformations, and theme). + + Returns: + - qs_baseline (Queryset): Fatalities conflict history, cm level. Predicting ln(fatalities) using conflict predictors, ultrashort. + + Note: + - Queryset taken from [viewsforecasting/Tools/cm_querysets.py](https://github.com/prio-data/viewsforecasting/blob/4dbc2cd2b6edb3169fc585f7dbb868b65fab0e2c/Tools/cm_querysets.py#L16) + - Queryset will be used in src/dataloaders/get_data.py to fetch data. + """ + + qs_baseline = (Queryset("fatalities002_baseline", "country_month") + + # target variable + .with_column(Column("ln_ged_sb_dep", from_loa="country_month", from_column="ged_sb_best_sum_nokgi") + .transform.ops.ln() + .transform.missing.fill() + ) + + # timelag 0 of target variable + .with_column(Column("ln_ged_sb", from_loa="country_month", from_column="ged_sb_best_sum_nokgi") + .transform.ops.ln() + .transform.missing.fill() + ) + # Decay functions + # state-based (sb) + .with_column(Column("decay_ged_sb_5", from_loa="country_month", from_column="ged_sb_best_sum_nokgi") + .transform.missing.replace_na() + .transform.bool.gte(5) + .transform.temporal.time_since() + .transform.temporal.decay(24) + .transform.missing.replace_na() + ) + # one-sided (os) + .with_column(Column("decay_ged_os_5", from_loa="country_month", from_column="ged_os_best_sum_nokgi") + .transform.missing.replace_na() + .transform.bool.gte(5) + .transform.temporal.time_since() + .transform.temporal.decay(24) + .transform.missing.replace_na() + ) + # Spatial lag decay + .with_column(Column("splag_1_decay_ged_sb_5", from_loa="country_month", + from_column="ged_sb_best_sum_nokgi") + .transform.missing.replace_na() + .transform.bool.gte(5) + .transform.temporal.time_since() + .transform.temporal.decay(24) + .transform.spatial.countrylag(1, 1, 0, 0) + .transform.missing.replace_na() + ) + + # From WDI + + .with_column(Column("wdi_sp_pop_totl", from_loa="country_year", from_column="wdi_sp_pop_totl") + .transform.missing.fill() + .transform.temporal.tlag(12) + .transform.missing.fill() + .transform.missing.replace_na() + ) + + .with_theme("fatalities") + .describe("""Fatalities conflict history, cm level + + Predicting ln(fatalities) using conflict predictors, ultrashort + + """) + ) + return qs_baseline \ No newline at end of file diff --git a/models/black_lodge/configs/config_meta.py b/models/black_lodge/configs/config_meta.py new file mode 100644 index 00000000..f5953592 --- /dev/null +++ b/models/black_lodge/configs/config_meta.py @@ -0,0 +1,18 @@ +def get_meta_config(): + """ + Contains the common configuration settings for the model (model architecture, name, target variable, level of analysis and deployment status). + + Returns: + - model_config (dict): A dictionary containing model configuration settings. + """ + meta_config = { + "name": "black_lodge", + "algorithm": "XGBRFRegressor", + "depvar": "ln_ged_sb_dep", + "queryset": "fatalities002_baseline", + "level": "cm", + "creator": "Sara", + "preprocessing": "float_it", #new + "data_train": "baseline002", #new + } + return meta_config #formely model_config \ No newline at end of file diff --git a/models/black_lodge/configs/config_sweep.py b/models/black_lodge/configs/config_sweep.py new file mode 100644 index 00000000..e57e174a --- /dev/null +++ b/models/black_lodge/configs/config_sweep.py @@ -0,0 +1,31 @@ +def get_sweep_config(): + + """ + Contains the configuration for hyperparameter sweeps using WandB. + This configuration is "operational" so modifying it will change the search strategy, parameter ranges, and other settings for hyperparameter tuning aimed at optimizing model performance. + + Returns: + - sweep_config (dict): A dictionary containing the configuration for hyperparameter sweeps, defining the methods and parameter ranges used to search for optimal hyperparameters. + """ + + sweep_config = { + 'name': 'black_lodge', + 'method': 'grid' + } + + metric = { + 'name': '36month_mean_squared_error', + 'goal': 'minimize' + } + + sweep_config['metric'] = metric + + parameters_dict = { + "n_estimators": {"values": [100, 200]}, + "learning_rate": {"values": [0.05]}, + "n_jobs": {"values": [12]}, + } #taken from xiaolong's code + + sweep_config['parameters'] = parameters_dict + + return sweep_config \ No newline at end of file diff --git a/models/black_lodge/data/generated/.gitkeep b/models/black_lodge/data/generated/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/models/black_lodge/data/processed/.gitkeep b/models/black_lodge/data/processed/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/models/black_lodge/data/raw/.gitkeep b/models/black_lodge/data/raw/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/models/black_lodge/main.py b/models/black_lodge/main.py new file mode 100644 index 00000000..4196939d --- /dev/null +++ b/models/black_lodge/main.py @@ -0,0 +1,32 @@ +import time +import wandb +import sys +from pathlib import Path + +PATH = Path(__file__) +sys.path.insert(0, str(Path( + *[i for i in PATH.parts[:PATH.parts.index("views_pipeline") + 1]]) / "common_utils")) # PATH_COMMON_UTILS +from set_path import setup_project_paths +setup_project_paths(PATH) + +from utils_cli_parser import parse_args, validate_arguments +from execute_model_runs import execute_sweep_run, execute_single_run + + +if __name__ == "__main__": + args = parse_args() + validate_arguments(args) + + # wandb login + wandb.login() + + start_t = time.time() + + if args.sweep == True: + execute_sweep_run(args) + elif args.sweep == False: + execute_single_run(args) + + end_t = time.time() + minutes = (end_t - start_t) / 60 + print(f'Done. Runtime: {minutes:.3f} minutes') \ No newline at end of file diff --git a/models/black_lodge/notebooks/.gitkeep b/models/black_lodge/notebooks/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/models/black_lodge/reports/.gitkeep b/models/black_lodge/reports/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/models/black_lodge/src/dataloaders/get_data.py b/models/black_lodge/src/dataloaders/get_data.py new file mode 100644 index 00000000..5b2697f7 --- /dev/null +++ b/models/black_lodge/src/dataloaders/get_data.py @@ -0,0 +1,86 @@ +import sys +from pathlib import Path +import pandas as pd +import numpy as np + +PATH = Path(__file__) +sys.path.insert(0, str(Path(*[i for i in PATH.parts[:PATH.parts.index("views_pipeline")+1]]) / "common_utils")) +from set_path import setup_project_paths, setup_data_paths +setup_project_paths(PATH) + +from utils_input_data import ensure_float64 +from set_partition import get_partitioner_dict +from config_input_data import get_input_data + +def get_data(): + """ + Fetches the input data from the viewser queryset defined in get_input_data function of config_input_data.py + Checks if the data is already saved as a parquet file, if not, fetches the data and saves it as a parquet file. + Also ensures that all numeric columns are of type np.float64. + + Returns: + - data (pd.DataFrame): Input data fetched from the viewser queryset. + """ + print("Getting data...") + PATH_RAW, _, _ = setup_data_paths(PATH) + parquet_path = PATH_RAW / 'raw.parquet' + # print('PARQUET PATH', parquet_path) # use this to debug if the parquet path is correct + if not parquet_path.exists(): + qs = get_input_data() + data = qs.publish().fetch() + data = ensure_float64(data) + data.to_parquet(parquet_path) + else: + data = pd.read_parquet(parquet_path) + + return data + +def get_partition_data(df, partition): + """ + Temporally subsets the input data based on the partition. + + Args: + - df (pd.DataFrame): Input data fetched from the viewser queryset. + - partition (str): The partition of the data (calibration, testing, forecasting). + + Returns: + - df (pd.DataFrame): Temporally subsetted data based on the partition. + """ + + partitioner_dict = get_partitioner_dict(partition) + + month_first = partitioner_dict['train'][0] + + if partition == 'forecasting': + month_last = partitioner_dict['train'][1] + 1 # no need to get the predict months as these are empty + + elif partition == 'calibration' or partition == 'testing': + month_last = partitioner_dict['predict'][1] + 1 # predict[1] is the last month to predict, so we need to add 1 to include it. + + else: + raise ValueError('partition should be either "calibration", "testing" or "forecasting"') + + month_range = np.arange(month_first, month_last,1) # predict[1] is the last month to predict, so we need to add 1 to include it. + + df = df[df.index.get_level_values("month_id").isin(month_range)].copy() # temporal subset + + return df + + +def check_data(): + """ + Check missingness and infinity values in the input data + + Taken from viewsforecasting repo, to be further developed. + """ + data = get_data() + print("Checking missingness and infinity values in the input data...") + print("Missing values in the input data:") + print(data.isnull().sum()) + print("Infinity values in the input data:") + print(data.isin([np.inf, -np.inf]).sum()) + + +if __name__ == "__main__": + get_data() + print("Data fetched successfully.") \ No newline at end of file diff --git a/models/black_lodge/src/forecasting/generate_forecast.py b/models/black_lodge/src/forecasting/generate_forecast.py new file mode 100644 index 00000000..62519d20 --- /dev/null +++ b/models/black_lodge/src/forecasting/generate_forecast.py @@ -0,0 +1,50 @@ +import sys +import pandas as pd +from pathlib import Path +import pickle + +PATH = Path(__file__) +sys.path.insert(0, str(Path( + *[i for i in PATH.parts[:PATH.parts.index("views_pipeline") + 1]]) / "common_utils")) # PATH_COMMON_UTILS +from set_path import setup_project_paths, setup_data_paths, setup_artifacts_paths +setup_project_paths(PATH) + +from set_partition import get_partitioner_dict +from utils import get_partition_data, get_standardized_df +from utils_artifacts import get_latest_model_artifact + + +def forecast_model_artifact(config, artifact_name): + PATH_RAW, _, PATH_GENERATED = setup_data_paths(PATH) + PATH_ARTIFACTS = setup_artifacts_paths(PATH) + + # if an artifact name is provided through the CLI, use it. + # Otherwise, get the latest model artifact based on the run type + if artifact_name: + print(f"Using (non-default) artifact: {artifact_name}") + + if not artifact_name.endswith('.pkl'): + artifact_name += '.pkl' + PATH_ARTIFACT = PATH_ARTIFACTS / artifact_name + else: + # use the latest model artifact based on the run type + print(f"Using latest (default) run type ({config.run_type}) specific artifact") + PATH_ARTIFACT = get_latest_model_artifact(PATH_ARTIFACTS, config.run_type) + + config["timestamp"] = PATH_ARTIFACT.stem[-15:] + dataset = pd.read_parquet(PATH_RAW / f'raw_{config.run_type}.parquet') + + try: + stepshift_model = pd.read_pickle(PATH_ARTIFACT) + except: + raise FileNotFoundError(f"Model artifact not found at {PATH_ARTIFACT}") + + partition = get_partitioner_dict(config.run_type)['predict'] + df_predictions = stepshift_model.future_point_predict(partition[0]-1, + get_partition_data(dataset, config.run_type), + keep_specific=True) + df_predictions = get_standardized_df(df_predictions, config.run_type) + + predictions_path = f'{PATH_GENERATED}/predictions_{config.steps[-1]}_{config.run_type}_{config.timestamp}.pkl' + with open(predictions_path, 'wb') as file: + pickle.dump(df_predictions, file) \ No newline at end of file diff --git a/models/black_lodge/src/management/execute_model_runs.py b/models/black_lodge/src/management/execute_model_runs.py new file mode 100644 index 00000000..a766aca9 --- /dev/null +++ b/models/black_lodge/src/management/execute_model_runs.py @@ -0,0 +1,49 @@ +import sys +from pathlib import Path +import wandb + +PATH = Path(__file__) +sys.path.insert(0, str(Path( + *[i for i in PATH.parts[:PATH.parts.index("views_pipeline") + 1]]) / "common_utils")) # PATH_COMMON_UTILS +from set_path import setup_project_paths +setup_project_paths(PATH) + +from config_hyperparameters import get_hp_config +from config_meta import get_meta_config +from config_sweep import get_sweep_config +from execute_model_tasks import execute_model_tasks +from get_data import get_data +from utils import update_hp_config, update_sweep_config + + +def execute_sweep_run(args): + get_data(args) + + sweep_config = get_sweep_config() + meta_config = get_meta_config() + update_sweep_config(sweep_config, args, meta_config) + + project = f"{sweep_config['name']}_sweep" # we can name the sweep in the config file + sweep_id = wandb.sweep(sweep_config, project=project, entity='views_pipeline') + wandb.agent(sweep_id, execute_model_tasks, entity='views_pipeline') + + +def execute_single_run(args): + get_data(args) + + hp_config = get_hp_config() + meta_config = get_meta_config() + update_hp_config(hp_config, args, meta_config) + + project = f"{hp_config['name']}_{args.run_type}" + + if args.run_type == 'calibration' or args.run_type == 'testing': + execute_model_tasks(config=hp_config, project=project, train=args.train, eval=args.evaluate, + forecast=False, artifact_name=args.artifact_name) + + elif args.run_type == 'forecasting': + execute_model_tasks(config=hp_config, project=project, train=args.train, eval=False, forecast=args.forecast, + artifact_name=args.artifact_name) + + else: + raise ValueError(f"Invalid run type: {args.run_type}") \ No newline at end of file diff --git a/models/black_lodge/src/management/execute_model_tasks.py b/models/black_lodge/src/management/execute_model_tasks.py new file mode 100644 index 00000000..82985eef --- /dev/null +++ b/models/black_lodge/src/management/execute_model_tasks.py @@ -0,0 +1,74 @@ +import sys +from pathlib import Path +import wandb + +PATH = Path(__file__) +sys.path.insert(0, str(Path( + *[i for i in PATH.parts[:PATH.parts.index("views_pipeline") + 1]]) / "common_utils")) # PATH_COMMON_UTILS +from set_path import setup_project_paths +setup_project_paths(PATH) + +from evaluate_model import evaluate_model_artifact +from evaluate_sweep import evaluate_sweep +from generate_forecast import forecast_model_artifact +from train_model import train_model_artifact +from utils import get_model, split_hurdle_parameters +from utils_wandb import add_wandb_monthly_metrics + + +def execute_model_tasks(config=None, project=None, train=None, eval=None, forecast=None, artifact_name=None): + """ + Executes various model-related tasks including training, evaluation, and forecasting. + + This function manages the execution of different tasks such as training the model, + evaluating an existing model, or performing forecasting. + It also initializes the WandB project. + + Args: + config: Configuration object containing parameters and settings. + project: The WandB project name. + train: Flag to indicate if the model should be trained. + eval: Flag to indicate if the model should be evaluated. + forecast: Flag to indicate if forecasting should be performed. + artifact_name (optional): Specific name of the model artifact to load for evaluation or forecasting. + """ + + # Initialize WandB + with wandb.init(project=project, entity="views_pipeline", + config=config): # project and config ignored when running a sweep + + # add the monthly metrics to WandB + add_wandb_monthly_metrics() + + # Update config from WandB initialization above + config = wandb.config + + # W&B does not directly support nested dictionaries for hyperparameters + # This will make the sweep config super ugly, but we don't have to distinguish between sweep and single runs + if config['sweep'] and config['algorithm'] == "HurdleRegression": + config['parameters'] = {} + config['parameters']['clf'], config['parameters']['reg'] = split_hurdle_parameters(config) + + model = get_model(config) + print(model) + + if config['sweep']: + print("Sweeping...") + stepshift_model = train_model_artifact(config, model) + print("Evaluating...") + evaluate_sweep(config, stepshift_model) + + + # Handle the single model runs: train and save the model as an artifact + if train: + print("Training...") + train_model_artifact(config, model) + + # Handle the single model runs: evaluate a trained model (artifact) + if eval: + print("Evaluating...") + evaluate_model_artifact(config, artifact_name) + + if forecast: + print("Forecasting...") + forecast_model_artifact(config, artifact_name) \ No newline at end of file diff --git a/models/black_lodge/src/offline_evaluation/evaluate_model.py b/models/black_lodge/src/offline_evaluation/evaluate_model.py new file mode 100644 index 00000000..a6f5374c --- /dev/null +++ b/models/black_lodge/src/offline_evaluation/evaluate_model.py @@ -0,0 +1,58 @@ +import sys +from pathlib import Path +import warnings +warnings.filterwarnings("ignore") +import wandb + +PATH = Path(__file__) +sys.path.insert(0, str(Path( + *[i for i in PATH.parts[:PATH.parts.index("views_pipeline") + 1]]) / "common_utils")) # PATH_COMMON_UTILS +from set_path import setup_project_paths, setup_data_paths, setup_artifacts_paths +setup_project_paths(PATH) + +from utils import save_model_outputs, get_partition_data, get_standardized_df +from utils_artifacts import get_latest_model_artifact +from utils_evaluation_metrics import generate_metric_dict +from utils_model_outputs import generate_output_dict +from utils_wandb import generate_wandb_log_dict +from views_forecasts.extensions import * + + +def evaluate_model_artifact(config, artifact_name): + PATH_RAW, _, PATH_GENERATED = setup_data_paths(PATH) + PATH_ARTIFACTS = setup_artifacts_paths(PATH) + + # if an artifact name is provided through the CLI, use it. + # Otherwise, get the latest model artifact based on the run type + if artifact_name: + print(f"Using (non-default) artifact: {artifact_name}") + + if not artifact_name.endswith('.pkl'): + artifact_name += '.pkl' + PATH_ARTIFACT = PATH_ARTIFACTS / artifact_name + else: + # use the latest model artifact based on the run type + print(f"Using latest (default) run type ({config.run_type}) specific artifact") + PATH_ARTIFACT = get_latest_model_artifact(PATH_ARTIFACTS, config.run_type) + + config["timestamp"] = PATH_ARTIFACT.stem[-15:] + dataset = pd.read_parquet(PATH_RAW / f'raw_{config.run_type}.parquet') + + try: + stepshift_model = pd.read_pickle(PATH_ARTIFACT) + except: + raise FileNotFoundError(f"Model artifact not found at {PATH_ARTIFACT}") + + df = stepshift_model.predict(config.run_type, "predict", get_partition_data(dataset, config.run_type)) + df = get_standardized_df(df, config.run_type) + + evaluation, df_evaluation = generate_metric_dict(df, config) + output, df_output = generate_output_dict(df, config) + for t in config.steps: + log_dict = {} + log_dict["monthly/out_sample_month"] = t + step = f"step{str(t).zfill(2)}" + log_dict = generate_wandb_log_dict(log_dict, evaluation, step) + wandb.log(log_dict) + + save_model_outputs(df_evaluation, df_output, PATH_GENERATED, config) \ No newline at end of file diff --git a/models/black_lodge/src/offline_evaluation/evaluate_sweep.py b/models/black_lodge/src/offline_evaluation/evaluate_sweep.py new file mode 100644 index 00000000..86f8f231 --- /dev/null +++ b/models/black_lodge/src/offline_evaluation/evaluate_sweep.py @@ -0,0 +1,36 @@ +from pathlib import Path +import pandas as pd +import wandb +from sklearn.metrics import mean_squared_error + +PATH = Path(__file__) +from set_path import setup_project_paths, setup_data_paths +setup_project_paths(PATH) + +from utils import get_partition_data, get_standardized_df +from utils_wandb import generate_wandb_log_dict +from utils_evaluation_metrics import generate_metric_dict + + +def evaluate_sweep(config, stepshift_model): + PATH_RAW, _, _ = setup_data_paths(PATH) + + dataset = pd.read_parquet(PATH_RAW / f'raw_{config.run_type}.parquet') + + df = stepshift_model.predict(config.run_type, "predict", get_partition_data(dataset, config.run_type)) + df = get_standardized_df(df, config.run_type) + + # Temporarily keep this because the metric to minimize is MSE + pred_cols = [f"step_pred_{str(i)}" for i in config.steps] + df["mse"] = df.apply(lambda row: mean_squared_error([row[config.depvar]] * 36, + [row[col] for col in pred_cols]), axis=1) + + wandb.log({'MSE': df['mse'].mean()}) + + evaluation, df_evaluation = generate_metric_dict(df, config) + for t in config.steps: + log_dict = {} + log_dict["monthly/out_sample_month"] = t + step = f"step{str(t).zfill(2)}" + log_dict = generate_wandb_log_dict(log_dict, evaluation, step) + wandb.log(log_dict) \ No newline at end of file diff --git a/models/black_lodge/src/training/train_model.py b/models/black_lodge/src/training/train_model.py new file mode 100644 index 00000000..02cc3408 --- /dev/null +++ b/models/black_lodge/src/training/train_model.py @@ -0,0 +1,41 @@ +from datetime import datetime +import pandas as pd +from pathlib import Path + +PATH = Path(__file__) +from set_path import setup_project_paths, setup_data_paths, setup_artifacts_paths +setup_project_paths(PATH) + +from set_partition import get_partitioner_dict +from stepshift.views import StepshiftedModels +from utils import get_partition_data +from views_forecasts.extensions import * +from views_partitioning.data_partitioner import DataPartitioner +from views_stepshift.run import ViewsRun + + +def train_model_artifact(config, model): + # print(config) + PATH_RAW, _, _ = setup_data_paths(PATH) + PATH_ARTIFACTS = setup_artifacts_paths(PATH) + + run_type = config['run_type'] + dataset = pd.read_parquet(PATH_RAW / f'raw_{run_type}.parquet') + + stepshift_model = stepshift_training(config, run_type, model, get_partition_data(dataset, run_type)) + if not config["sweep"]: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + model_filename = f"{config.run_type}_model_{timestamp}.pkl" + stepshift_model.save(PATH_ARTIFACTS / model_filename) + return stepshift_model + + +def stepshift_training(config, partition_name, model, dataset): + steps = config["steps"] + target = config["depvar"] + partitioner_dict = get_partitioner_dict(partition_name) + partition = DataPartitioner({partition_name: partitioner_dict}) + stepshift_def = StepshiftedModels(model, steps, target) + stepshift_model = ViewsRun(partition, stepshift_def) + stepshift_model.fit(partition_name, "train", dataset) + return stepshift_model \ No newline at end of file diff --git a/models/black_lodge/src/utils/utils.py b/models/black_lodge/src/utils/utils.py new file mode 100644 index 00000000..3b361c21 --- /dev/null +++ b/models/black_lodge/src/utils/utils.py @@ -0,0 +1,144 @@ +import sys +import numpy as np +from lightgbm import LGBMRegressor +from xgboost import XGBRegressor +from sklearn.ensemble import RandomForestClassifier +from pathlib import Path +import pickle + +PATH = Path(__file__) +sys.path.insert(0, str(Path( + *[i for i in PATH.parts[:PATH.parts.index("views_pipeline") + 1]]) / "common_utils")) # PATH_COMMON_UTILS +from set_path import setup_project_paths +setup_project_paths(PATH) + +from hurdle_model import HurdleRegression +from set_partition import get_partitioner_dict +from views_forecasts.extensions import * + + +def ensure_float64(df): + """ + Check if the DataFrame only contains np.float64 types. If not, raise a warning + and convert the DataFrame to use np.float64 for all its numeric columns. + """ + + non_float64_cols = df.select_dtypes(include=['number']).columns[df.select_dtypes(include=['number']).dtypes != np.float64] + + if len(non_float64_cols) > 0: + print(f"Warning: DataFrame contains non-np.float64 numeric columns. Converting the following columns: {', '.join(non_float64_cols)}") + + for col in non_float64_cols: + df[col] = df[col].astype(np.float64) + + return df + + +def get_model(config): + if config["algorithm"] == "HurdleRegression": + model = HurdleRegression(clf_name=config["model_clf"], reg_name=config["model_reg"], + clf_params=config["parameters"]["clf"], reg_params=config["parameters"]["reg"]) + else: + parameters = get_parameters(config) + model = globals()[config["algorithm"]](**parameters) + return model + + +def get_parameters(config): + ''' + Get the parameters from the config file. + If not sweep, then get directly from the config file, otherwise have to remove some parameters. + ''' + + if config["sweep"]: + keys_to_remove = ["algorithm", "depvar", "steps", "sweep", "run_type", "model_cls", "model_reg"] + parameters = {k: v for k, v in config.items() if k not in keys_to_remove} + else: + parameters = config["parameters"] + + return parameters + + +def get_partition_data(df, run_type): + partitioner_dict = get_partitioner_dict(run_type) + + month_first = partitioner_dict['train'][0] + + if run_type in ['calibration', 'testing']: + month_last = partitioner_dict['predict'][1] + 1 + elif run_type == 'forecasting': + month_last = partitioner_dict['predict'][0] + else: + raise ValueError('partition should be either "calibration", "testing" or "forecasting"') + + month_range = np.arange(month_first, month_last, 1) # predict[1] is the last month to predict, so we need to add 1 to include it. + + df = df[df.index.get_level_values("month_id").isin(month_range)].copy() # temporal subset + + return df + + +def get_standardized_df(df, run_type): + if run_type in ['calibration', 'testing']: + cols = [df.forecasts.target] + df.forecasts.prediction_columns + elif run_type == "forecasting": + cols = df.forecasts.prediction_columns + df = df.replace([np.inf, -np.inf], 0)[cols] + df = df.mask(df < 0, 0) + return df + + +def save_model_outputs(df_evaluation, df_output, PATH_GENERATED, config): + Path(PATH_GENERATED).mkdir(parents=True, exist_ok=True) + print(f'PATH to generated data: {PATH_GENERATED}') + + # Save the DataFrame of model outputs + outputs_path = f'{PATH_GENERATED}/df_output_{config.steps[-1]}_{config.run_type}_{config.timestamp}.pkl' + with open(outputs_path, 'wb') as file: + pickle.dump(df_output, file) + + # Save the DataFrame of evaluation metrics + evaluation_path = f'{PATH_GENERATED}/df_evaluation_{config.steps[-1]}_{config.run_type}_{config.timestamp}.pkl' + with open(evaluation_path, 'wb') as file: + pickle.dump(df_evaluation, file) + + +def split_hurdle_parameters(parameters_dict): + """ + Split the parameters dictionary into two separate dictionaries, one for the + classification model and one for the regression model. + """ + + cls_dict = {} + reg_dict = {} + + for key, value in parameters_dict.items(): + if key.startswith('cls_'): + cls_key = key.replace('cls_', '') + cls_dict[cls_key] = value + elif key.startswith('reg_'): + reg_key = key.replace('reg_', '') + reg_dict[reg_key] = value + + return cls_dict, reg_dict + + +def update_hp_config(hp_config, args, meta_config): + hp_config['run_type'] = args.run_type + hp_config['sweep'] = False + hp_config['name'] = meta_config['name'] + hp_config['depvar'] = meta_config['depvar'] + hp_config['algorithm'] = meta_config['algorithm'] + if meta_config['algorithm'] == 'HurdleRegression': + hp_config['model_clf'] = meta_config['model_clf'] + hp_config['model_reg'] = meta_config['model_reg'] + + +def update_sweep_config(sweep_config, args, meta_config): + sweep_config['parameters']['run_type'] = {'value': args.run_type} + sweep_config['parameters']['sweep'] = {'value': True} + sweep_config['parameters']['depvar'] = {'value': meta_config['depvar']} + sweep_config['parameters']['algorithm'] = {'value': meta_config['algorithm']} + if meta_config['algorithm'] == 'HurdleRegression': + sweep_config['parameters']['model_clf'] = {'value': meta_config['model_clf']} + sweep_config['parameters']['model_reg'] = {'value': meta_config['model_reg']} \ No newline at end of file diff --git a/models/black_lodge/src/utils/utils_wandb.py b/models/black_lodge/src/utils/utils_wandb.py new file mode 100644 index 00000000..5ba07f61 --- /dev/null +++ b/models/black_lodge/src/utils/utils_wandb.py @@ -0,0 +1,92 @@ +from pathlib import Path +import sys +import wandb + +PATH = Path(__file__) +sys.path.insert(0, str(Path( + *[i for i in PATH.parts[:PATH.parts.index("views_pipeline") + 1]]) / "common_utils")) # PATH_COMMON_UTILS +from set_path import setup_project_paths +setup_project_paths(PATH) + + +# there are things in other utils that should be here... + +def add_wandb_monthly_metrics(): + """ + Defines the WandB metrics for monthly evaluation. + + This function sets up the metrics for logging monthly evaluation metrics in WandB. + It defines a step metric called "monthly/out_sample_month" and specifies that any + metric under the "monthly" namespace will use "monthly/out_sample_month" as its step metric. + + Usage: + This function should be called at the start of a WandB run to configure + how metrics are tracked over time steps. + + Example: + >>> wandb.init(project="example_project") + >>> add_wandb_monthly_metrics() + >>> wandb.log({"monthly/mean_squared_error": 0.02, "monthly/out_sample_month": 1}) + + Notes: + - The step metric "monthly/out_sample_month" will be used to log metrics for each time (i.e. forecasted month). + - Any metric prefixed with "monthly/" will follow the "monthly/out_sample_month" step metric. + + See Also: + - `wandb.define_metric`: WandB API for defining metrics and their step relationships. + """ + + # Define "new" monthly metrics for WandB logging + wandb.define_metric("monthly/out_sample_month") + wandb.define_metric("monthly/*", step_metric="monthly/out_sample_month") + + +def generate_wandb_log_dict(log_dict, dict_of_eval_dicts, step): + """ + ---Adapted version from simon's code--- + + Adds evaluation metrics to a WandB log dictionary for a specific time step (i.e. forcasted month). + + This function updates the provided log dictionary with evaluation metrics from + a specified feature and step, formatted for WandB logging. It appends the metrics + to the log dictionary using the "monthly/{metric_name}" format. + + Args: + log_dict (dict): The log dictionary to be updated with new metrics. + dict_of_eval_dicts (Dict[str, Dict[str, EvaluationMetrics]]): A dictionary of evaluation metrics, + where the keys are feature identifiers and the values are dictionaries with time steps as keys + and `EvaluationMetrics` instances as values. + step (str): The specific time step (month forecasted) for which metrics are logged (e.g., 'step01'). + + Returns: + dict: The updated log dictionary with the evaluation metrics for the specified feature and step. + + Example: + >>> log_dict = {} + >>> dict_of_eval_dicts = { + ... 'step01': EvaluationMetrics(MSE=0.1, AP=0.2, AUC=0.3, Brier=0.4), + ... 'step02': EvaluationMetrics(MSE=0.2, AP=0.3, AUC=0.4, Brier=0.5), + ... + ... } + >>> log_dict = generate_wandb_log_dict(log_dict, dict_of_eval_dicts, 'step01') + >>> print(log_dict) + { + 'monthly/MSE': 0.1, + 'monthly/AP': 0.2, + 'monthly/AUC': 0.3, + 'monthly/Brier': 0.4 + } + + Notes: + - Only non-None values from the `EvaluationMetrics` instance are added to the log dictionary. + - The metrics are formatted with the "monthly/{metric_name}" naming convention for WandB logging. + + See Also: + - `wandb.log`: WandB API for logging metrics. + """ + + for key, value in dict_of_eval_dicts[step].items(): + if value is not None: + log_dict[f"monthly/{key}"] = value + + return log_dict \ No newline at end of file