diff --git a/jobs/kpi-forecasting/kpi_forecasting.py b/jobs/kpi-forecasting/kpi_forecasting.py index e7dcca7c..81f2ceb7 100644 --- a/jobs/kpi-forecasting/kpi_forecasting.py +++ b/jobs/kpi-forecasting/kpi_forecasting.py @@ -1,13 +1,15 @@ from kpi_forecasting.inputs import CLI, YAML from kpi_forecasting.models.prophet_forecast import ProphetForecast from kpi_forecasting.models.funnel_forecast import FunnelForecast +from kpi_forecasting.models.scalar_forecast import ScalarForecast from kpi_forecasting.metric_hub import MetricHub - +from kpi_forecasting.metric_hub import ForecastDataPull # A dictionary of available models in the `models` directory. MODELS = { "prophet": ProphetForecast, "funnel": FunnelForecast, + "scalar": ScalarForecast } @@ -16,10 +18,15 @@ def main() -> None: config = YAML(filepath=CLI().args.config).data model_type = config.forecast_model.model_type - if model_type in MODELS: - metric_hub = MetricHub(**config.metric_hub) - model = MODELS[model_type](metric_hub=metric_hub, **config.forecast_model) + if "metric_hub" in dir(config): + data_puller = MetricHub(**config.metric_hub) + elif "forecast_data_pull" in dir(config): + data_puller = ForecastDataPull(**config.forecast_data_pull) + else: + raise KeyError("No metric_hub or forecast_data_pull key in config to pull data.") + if model_type in MODELS: + model = MODELS[model_type](metric_hub=data_puller, **config.forecast_model) model.fit() model.predict() model.summarize(**config.summarize) diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py index 1ebd482e..e57c0ef2 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/__init__.py @@ -1,7 +1,11 @@ import attr -from typing import List, Optional, Union +from dataclasses import dataclass +from datetime import datetime +from dotmap import DotMap from pathlib import Path +from typing import List, Optional, Union +import pandas as pd from kpi_forecasting.inputs import YAML @@ -9,9 +13,11 @@ PARENT_PATH = Path(__file__).parent HOLIDAY_PATH = PARENT_PATH / "holidays.yaml" REGRESSOR_PATH = PARENT_PATH / "regressors.yaml" +SCALAR_PATH = PARENT_PATH / "scalar_adjustments.yaml" -holiday_collection = YAML(HOLIDAY_PATH) -regressor_collection = YAML(REGRESSOR_PATH) +HOLIDAY_COLLECTION = YAML(HOLIDAY_PATH) +REGRESSOR_COLLECTION = YAML(REGRESSOR_PATH) +SCALAR_ADJUSTMENTS = YAML(SCALAR_PATH) @attr.s(auto_attribs=True, frozen=False) @@ -38,3 +44,85 @@ class ProphetHoliday: ds: List lower_window: int upper_window: int + + +@dataclass +class ScalarAdjustments: + """ + Holds the names and dates where a scalar adjustment should be applied. + + Args: + name (str): The name of the adjustment from the scalar_adjustments.yaml file. + forecast_start_date (datetime): The first forecast_start_date where this iteration of the + adjustment should be applied. This adjustment will apply to any subsequent forecast + until another update of this adjustment is made. + adjustments_dataframe (DataFrame): A DataFrame that contains the dimensions of the segments + being forecasted as columns, as well as the start dates and values for each scalar + adjustment. + """ + + name: str + adjustment_dotmap: DotMap + + def __post_init__(self): + adj_list = [] + self.forecast_start_date = datetime.strptime( + self.adjustment_dotmap.forecast_start_date, "%Y-%m-%d" + ) + for segment_dat in self.adjustment_dotmap.segments: + segment = {**segment_dat.segment} + segment_adjustment_dat = [ + {**segment, **adj} for adj in segment_dat.adjustments + ] + adj_list.append(pd.DataFrame(segment_adjustment_dat)) + + # Create a DataFrame with each dimension in the segments, the start date of + ## each scalar adjustment, and the value of that adjustment + self.adjustments_dataframe = pd.concat(adj_list, ignore_index=True) + + +def parse_scalar_adjustments( + metric_hub_slug: str, forecast_start_date: datetime +) -> List[ScalarAdjustments]: + """ + Parses the SCALAR_ADJUSTMENTS to find the applicable scalar adjustments for a given metric hub slug + and forecast start date. + + Args: + metric_hub_slug (str): The metric hub slug being forecasted. It must be present by name in the + scalar_adjustments.yaml. + forecast_start_date (str): The first date being forecasted. Used here to map to the correct scalar + adjustments as the adjustments will be updated over time. + + Returns: + List[ScalarAdjustments]: A list of ScalarAdjustments, where each ScalarAdjustments is a named scalar adjustment with the + dates that the adjustment should be applied for each segment being modeled. + """ + metric_adjustments = getattr(SCALAR_ADJUSTMENTS.data, metric_hub_slug) + if not metric_adjustments: + raise KeyError(f"No adjustments found for {metric_hub_slug} in {SCALAR_PATH}.") + + # Creates a list of ScalarAdjustments objects that apply for this metric and forecast_start_date + applicable_adjustments = [] + for named_adjustment in metric_adjustments: + parsed_named_adjustments = [ + ScalarAdjustments(named_adjustment.name, adj_dotmap) + for adj_dotmap in named_adjustment.adjustments + ] + + # Sort list of parsed adjustments by forecast_start_date + sorted_parsed_named_adjustments = sorted( + parsed_named_adjustments, key=lambda d: d.forecast_start_date + ) + + # Iterate over the sorted list to find any adjustments that apply after the supplied forecast_start_date. + ## Returns `None` if no applicable value is found + matched_adjustment = None + for parsed_adjustment in sorted_parsed_named_adjustments: + if forecast_start_date >= parsed_adjustment.forecast_start_date: + matched_adjustment = parsed_adjustment + + if matched_adjustment: + applicable_adjustments.append(matched_adjustment) + + return applicable_adjustments diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml new file mode 100644 index 00000000..45ac6026 --- /dev/null +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/model_inputs/scalar_adjustments.yaml @@ -0,0 +1,162 @@ +--- +search_forecasting_revenue_per_ad_click: + - name: "year_over_year_growth" + description: "Estimate of YoY growth in RPC, from input from stakeholders." + adjustments: + - forecast_start_date: "2024-01-01" + segments: + - segment: + { + partner: "Google", + country: "US", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.03 + - segment: + { + partner: "Google", + country: "ROW", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.04 + - segment: + { + partner: "Google", + country: "ROW", + device: "mobile", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.04 + - forecast_start_date: "2024-04-01" + segments: + - segment: + { + partner: "Google", + country: "US", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.10 + - segment: + { + partner: "Google", + country: "ROW", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.10 + - segment: + { + partner: "Google", + country: "ROW", + device: "mobile", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.04 + - forecast_start_date: "2024-05-01" + segments: + - segment: + { + partner: "Google", + country: "US", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.10 + - start_date: "2024-08-01" + value: 1.03 + - segment: + { + partner: "Google", + country: "ROW", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.10 + - start_date: "2024-08-01" + value: 1.04 + - segment: + { + partner: "Google", + country: "ROW", + device: "mobile", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.04 + - forecast_start_date: "2024-06-01" + segments: + - segment: + { + partner: "Google", + country: "US", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.10 + - start_date: "2024-08-01" + value: 1.03 + - segment: + { + partner: "Google", + country: "ROW", + device: "desktop", + channel: "all", + } + adjustments: + - start_date: "2024-01-01" + value: 1.10 + - start_date: "2024-08-01" + value: 1.04 + +search_forecasting_ad_clicks: + - name: "project_denali" + description: "Estimate of ad click impact from Project Denali on mobile in May 2024." + adjustments: + - forecast_start_date: "2024-05-01" + segments: + - segment: + { + partner: "Google", + country: "US", + device: "mobile", + channel: "all", + } + adjustments: + - start_date: "2024-05-01" + value: 1.15 + - start_date: "2024-06-01" + value: 1.0 + - segment: + { + partner: "Google", + country: "ROW", + device: "mobile", + channel: "all", + } + adjustments: + - start_date: "2024-05-01" + value: 1.45 + - start_date: "2024-06-01" + value: 1.0 diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjusted.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjusted.yaml new file mode 100644 index 00000000..e1e140bc --- /dev/null +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_ad_clicks_adjusted.yaml @@ -0,0 +1,33 @@ +--- +forecast_data_pull: + app_name: "multi_product" + slug: "search_forecasting_ad_clicks" + alias: "search_forecasting_ad_clicks_adjusted" + start_date: "2020-01-01" + forecast_start_date: "2024-05-01" + forecast_project: "mozdata" + forecast_dataset: "revenue_cat3_analysis" + forecast_table: "search_revenue_forecast_stage" + segments: + device: "device" + channel: "channel" + country: "country" + partner: "partner" + where: "aggregation_period = 'day'" + +forecast_model: + model_type: "scalar" + start_date: NULL + end_date: NULL + use_holidays: False + parameters: + formula: "search_forecasting_ad_clicks * scalar" + +summarize: + requires_summarization: True + periods: ["day", "month", "year"] + +write_results: + project: "moz-fx-data-shared-prod" + dataset: "revenue_derived" + table: "search_revenue_forecasts_v1" diff --git a/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml new file mode 100644 index 00000000..a8978b4b --- /dev/null +++ b/jobs/kpi-forecasting/kpi_forecasting/configs/search_forecasting_revenue_per_ad_click.yaml @@ -0,0 +1,30 @@ +--- +metric_hub: + app_name: "multi_product" + slug: "search_forecasting_revenue_per_ad_click" + alias: "search_forecasting_revenue_per_ad_click" + start_date: "2020-01-01" + end_date: "last complete month" + segments: + device: "device" + channel: "'all'" + country: "CASE WHEN country = 'US' THEN 'US' ELSE 'ROW' END" + partner: "partner_name" + where: "partner_name = 'Google'" + +forecast_model: + model_type: "scalar" + start_date: NULL + end_date: NULL + use_holidays: False + parameters: + formula: "search_forecasting_revenue_per_ad_click:YOY * scalar" + +summarize: + requires_summarization: False + periods: ["month"] + +write_results: + project: "moz-fx-data-shared-prod" + dataset: "revenue_derived" + table: "search_revenue_forecasts_v1" diff --git a/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py b/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py index 64cf9d42..a2381602 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py +++ b/jobs/kpi-forecasting/kpi_forecasting/metric_hub.py @@ -1,6 +1,7 @@ import pandas as pd from dataclasses import dataclass +from datetime import date, timedelta from dotmap import DotMap from google.cloud import bigquery from mozanalysis.config import ConfigLoader @@ -10,14 +11,15 @@ @dataclass -class MetricHub: +class BaseDataPull: """ - Programatically get Metric Hub metrics from Big Query. - See https://mozilla.github.io/metric-hub/metrics/ for a list of metrics. + A base class to pull data from BigQuery. For use with the forecast classes in this + module, a `fetch` method must be implemented. Args: - app_name (str): The Metric Hub app name for the metric. - slug (str): The Metric Hub slug for the metric. + app_name (str): The app name that applies to the metric being retrieved. + slug (str): A slug for the metric, intended to mimic the nomenclature used for + metrics on Metric Hub. start_date (str): A 'YYYY-MM-DD' formatted-string that specifies the first date the metric should be queried. segments (Dict): A dictionary of segments to use to group metric values. @@ -31,6 +33,14 @@ class MetricHub: 'daily_active_users'. project (str): The Big Query project to use when establishing a connection to the Big Query client. + forecast_start_date (str): The forecast_start_date to use as the key to pull + forecast data. + forecast_project (str): BigQuery project where forecast table to be accessed is + located. + forecast_dataset (str): For pulling forecast data, the dataset where the forecast + data is stored in BigQuery. + forecast_table (str): The table name where data is stored in BigQuery for pulling + past forecast data. """ app_name: str @@ -41,6 +51,21 @@ class MetricHub: end_date: str = None alias: str = None project: str = "mozdata" + forecast_start_date: str = None + forecast_project: str = None + forecast_dataset: str = None + forecast_table: str = None + + def fetch(self) -> pd.DataFrame: + raise NotImplementedError + + +@dataclass +class MetricHub(BaseDataPull): + """ + Programatically get Metric Hub metrics from Big Query. + See https://mozilla.github.io/metric-hub/metrics/fenix/ for a list of metrics. + """ def __post_init__(self) -> None: self.start_date = pd.to_datetime(self.start_date).date() @@ -102,13 +127,109 @@ def fetch(self) -> pd.DataFrame: df = bigquery.Client(project=self.project).query(self.query()).to_dataframe() # ensure submission_date has type 'date' - df[self.submission_date_column] = pd.to_datetime( - df[self.submission_date_column] - ).dt.date + df["submission_date"] = pd.to_datetime(df["submission_date"]).dt.date + + # Track the min and max dates in the data, which may differ from the + # start/end dates + self.min_date = str(df["submission_date"].min()) + self.max_date = str(df["submission_date"].max()) + + return df + + +@dataclass +class ForecastDataPull(BaseDataPull): + """ + Programatically get metrics from Big Query forecast data tables. The tables + must follow the schema patterns found in the forecast tables produced by the + `write_results` methods of the model classes in this module. + """ + + def __post_init__(self) -> None: + self.start_date = pd.to_datetime(self.start_date).date() + + if self.end_date: + self.end_date = pd.to_datetime(parse_end_date(self.end_date)).date() + else: + # Default forecast horizon is 18 months. End date here is extended to 36 months, + ## to cover all current usecases + self.end_date = pd.to_datetime( + date.today() + timedelta(days=365 * 3) + ).date() + + self.alias = self.alias or (self.slug + "_adjusted") + + # Default submission_date column name is "submission_date". This could be altered to accept + ## an input, but there is no current need + self.submission_date_column = "submission_date" + + self.from_expression = ( + f"{self.project}.{self.forecast_dataset}.{self.forecast_table}" + ) + + # Add query snippets for segments + self.segment_select_query = "" + self.segment_groupby_query = "" + + if self.segments: + segment_select_query = [] + segments = dict(self.segments) + for alias, sql in segments.items(): + segment_select_query.append(f" {sql} AS {alias},") + self.segment_select_query = "," + "\n ".join( + segment_select_query + ) + self.segment_groupby_query = "," + "\n ,".join( + self.segments.keys() + ) + + self.where = f"AND {self.where}" if self.where else "" + + # Check if forecast_start_date was supplied. If not, create strting to grab the most recent forecast. + if not self.forecast_start_date: + self.forecast_start_date_snippet = f"""( + SELECT + MAX(forecast_start_date) + FROM {self.from_expression} + WHERE metric_slug = '{self.slug}')""" + else: + self.forecast_start_date_snippet = f"'{self.forecast_start_date}'" + + def query(self) -> str: + """Build a string to query the relevant metric values from Big Query.""" + return dedent( + f""" + WITH cte AS ( + SELECT + {self.submission_date_column} AS submission_date, + forecast_start_date, + ANY_VALUE(value HAVING MAX forecast_trained_at) AS value + {self.segment_select_query} + FROM {self.from_expression} + WHERE {self.submission_date_column} BETWEEN '{self.start_date}' AND '{self.end_date}' + AND metric_alias = '{self.slug}' AND forecast_start_date = {self.forecast_start_date_snippet} + {self.where} + GROUP BY {self.submission_date_column}, forecast_start_date + {self.segment_groupby_query} + ) + SELECT * EXCEPT (forecast_start_date) FROM cte + """ + ) + + def fetch(self) -> pd.DataFrame: + """Fetch the relevant metric values from Big Query.""" + print( + f"\nQuerying for the '{self.app_name}.{self.slug}' forecast':" + f"\n{self.query()}" + ) + df = bigquery.Client(project=self.project).query(self.query()).to_dataframe() + + # ensure submission_date has type 'date' + df["submission_date"] = pd.to_datetime(df["submission_date"]).dt.date # Track the min and max dates in the data, which may differ from the # start/end dates - self.min_date = str(df[self.submission_date_column].min()) - self.max_date = str(df[self.submission_date_column].max()) + self.min_date = str(df["submission_date"].min()) + self.max_date = str(df["submission_date"].max()) return df diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py index c5d4a980..3ff7dc64 100644 --- a/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py +++ b/jobs/kpi-forecasting/kpi_forecasting/models/funnel_forecast.py @@ -15,8 +15,8 @@ from kpi_forecasting.configs.model_inputs import ( ProphetHoliday, ProphetRegressor, - holiday_collection, - regressor_collection, + HOLIDAY_COLLECTION, + REGRESSOR_COLLECTION, ) from kpi_forecasting.models.base_forecast import BaseForecast from kpi_forecasting import pandas_extras as pdx @@ -99,12 +99,12 @@ def __post_init__(self) -> None: if model_params["holidays"]: holiday_list = [ - getattr(holiday_collection.data, h) + getattr(HOLIDAY_COLLECTION.data, h) for h in model_params["holidays"] ] if model_params["regressors"]: regressor_list = [ - getattr(regressor_collection.data, r) + getattr(REGRESSOR_COLLECTION.data, r) for r in model_params["regressors"] ] @@ -405,8 +405,18 @@ def _predict(self, segment_settings: SegmentModelSettings) -> pd.DataFrame: # error rates and how components resulted in those predictions. The `fillna` # call will fill the missing y values for forecasted dates, where only yhat # is available. + + segment_historical_indices = ( + self.observed_df[list(segment_settings.segment)] + == pd.Series(segment_settings.segment) + ).all(axis=1) + + observed_y = self.observed_df.loc[(segment_historical_indices)].rename( + columns=self.column_names_map + )[["ds", "y"]] + observed_y["ds"] = pd.to_datetime(observed_y["ds"]) components_df = components_df.merge( - segment_settings.segment_model.history[["ds", "y"]], + observed_y, on="ds", how="left", ).fillna(0) diff --git a/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py new file mode 100644 index 00000000..ac03f895 --- /dev/null +++ b/jobs/kpi-forecasting/kpi_forecasting/models/scalar_forecast.py @@ -0,0 +1,489 @@ +from dataclasses import dataclass +from datetime import datetime +import re +from typing import Dict, List + +from google.cloud import bigquery +from google.cloud.bigquery.enums import SqlTypeNames as bq_types +import numpy as np +import pandas as pd + +from kpi_forecasting import pandas_extras as pdx +from kpi_forecasting.configs.model_inputs import parse_scalar_adjustments +from kpi_forecasting.models.base_forecast import BaseForecast + + +@dataclass +class ScalarForecast(BaseForecast): + """ + ScalarForecast class for generating and managing forecast models where forecasts are + scalar adjustments of historical data or preceding Prophet-based forecasts. The + class handles cases where forecasts for a combination of dimensions are required for + a metric. + + Inherits from BaseForecast and provides methods for initializing forecast + parameters, building models, generating forecasts, summarizing results, + and writing results to BigQuery. + """ + + def __post_init__(self) -> None: + """ + Post-initialization method to set up necessary attributes and configurations. + + This method sets up the dates to predict, constructs segment combinations, + initializes models for each segment, and prepares attributes for storing results. + """ + super().__post_init__() + + # For monthly-level data, must adjust the start date to the first full month after the + ## observed df's last date. Otherwise, the first forecast date will be the first day after + ## rather than the first month after the historical data + + if all(pd.to_datetime(self.observed_df["submission_date"]).dt.day == 1): + self.start_date = self._default_start_date_monthly + + if self.metric_hub is None: + # this is used to avoid the code below for testing purposes + return + + # Get the list of adjustments for the metric slug being forecasted. That + ## slug must be a key in scalar_adjustments.yaml; otherwise, this will raise a KeyError + self.scalar_adjustments = parse_scalar_adjustments( + self.metric_hub.slug, self.start_date + ) + + self._prep_class_dataframes(self.observed_df, self.metric_hub.segments.keys()) + + # Set up the columns to be used to join the observed_df to the forecast_df in subsequent + ## methods + self.join_columns = self.combination_df.columns.to_list() + + # Rename the value column to the metric alias name, to enable supporting a formula with + ## covariates in the future + self.observed_df.rename(columns={"value": self.metric_hub.alias}, inplace=True) + + @property + def period_names_map(self) -> Dict[str, pd.DateOffset]: + """ + Map a period-over-period name to an offset to apply to DataFrame date columns. + + Returns: + Dict[str, str]: Mapping of column names. + """ + return {"YOY": pd.DateOffset(years=1), "MOM": pd.DateOffset(months=1)} + + @property + def _default_start_date_monthly(self) -> str: + """The first day after the last date in the observed dataset.""" + return self.observed_df["submission_date"].max() + pd.DateOffset(months=1) + + def _prep_class_dataframes( + self, observed_df: pd.DataFrame, segment_column_list: List + ) -> None: + """ + Prepares the dataframes necessary to identify segment combinations and hold results + of scalar forecasting. + + Args: + observed_df (pd.DataFrame): dataframe containing observed data used to model + must contain columns specified in the keys of the segments section of the config + segment_column_list (list): list of columns of observed_df to use to determine segments + """ + + # Construct a DataFrame containing all combination of segment values in the observed_df + self.combination_df = observed_df[segment_column_list].drop_duplicates() + + # Cross join to the dates_to_predict DataFrame to create a DataFrame that contains a row + ## for each forecast date for each segment + self.forecast_df = self.dates_to_predict.merge(self.combination_df, how="cross") + + def _parse_formula_for_over_period_changes(self) -> Dict | None: + """ + Find period-over-period metric specifications in provided formula. If present, create a dict that + maps a metric name to a period-over-period change. + """ + + # Pattern to match to the words before and after a colon. This will be the standard pattern + ## in a formula to denote that a period-over-period change will be applied to a metric + ## for a forecast. + pattern = r"\b(\w+:\w+)\b" + match = re.findall(pattern, self.parameters.formula) + + if match: + # Create dict from list of colon-separated strings (e.g. "metric_name:YOY"). + pop_dict = dict(pair.split(":") for pair in match) + return pop_dict + + return None + + def _add_scalar_columns(self) -> None: + """ + Adds the scalars to make metric adjustments to the dates specified in the self.scalar_adjustments + DataFrames. + """ + + for scalar_adjustment in self.scalar_adjustments: + adj_df = scalar_adjustment.adjustments_dataframe.rename( + columns={"value": f"scalar_{scalar_adjustment.name}"} + ) + self.forecast_df["submission_date"] = pd.to_datetime( + self.forecast_df["submission_date"] + ) + adj_df["start_date"] = pd.to_datetime(adj_df["start_date"]) + # Merge asof to align values based on start dates and dimensions + self.forecast_df = pd.merge_asof( + self.forecast_df.sort_values("submission_date"), + adj_df.sort_values("start_date"), + by=self.join_columns, + left_on="submission_date", + right_on="start_date", + direction="backward", + ) + + # Fill values with submission_date before start_date with np.nan, then replace NaN with + ## 1 to not apply any scalar for dates that don't apply or for segments without that + ## scalar + self.forecast_df[f"scalar_{scalar_adjustment.name}"] = np.where( + self.forecast_df["submission_date"] < self.forecast_df["start_date"], + np.nan, + self.forecast_df[f"scalar_{scalar_adjustment.name}"], + ) + + # Fill scalar column with 1. Scalars are always multiplicative, so this removes the scalar effect + ## for dates/segments where it shouldn't apply + self.forecast_df[f"scalar_{scalar_adjustment.name}"].fillna(1, inplace=True) + + # Drop the start_date column that isn't needed for forecasting and can be reused for multiple + ## metrics + self.forecast_df.drop(columns=["start_date"], inplace=True) + + def _fit(self) -> None: + # Create period-over-period dict, which defines how observed data is carried forward in cases + ## where the forecast is a scalar * previously observed data + pop_dict = self._parse_formula_for_over_period_changes() + if pop_dict: + for metric, period in pop_dict.items(): + metric_pop_name = f"{metric}_{period}" + + # Create date column in the forecast_df with the specified date offset + ## in order to merge in observed data from that period + offset = self.period_names_map[period] + self.forecast_df[f"{metric_pop_name}_date"] = ( + self.forecast_df["submission_date"] - offset + ) + + self.forecast_df[f"{metric_pop_name}_date"] = pd.to_datetime( + self.forecast_df[f"{metric_pop_name}_date"] + ) + self.observed_df["submission_date"] = pd.to_datetime( + self.observed_df["submission_date"] + ) + + # Merge observed data to be used in adjustments + self.forecast_df = self.forecast_df.merge( + self.observed_df[ + [*self.join_columns, "submission_date", metric] + ].rename(columns={"submission_date": "join_date"}), + how="left", + left_on=[*self.join_columns, f"{metric_pop_name}_date"], + right_on=[*self.join_columns, "join_date"], + ) + + # Remove unneeded date column + self.forecast_df.drop( + columns=[f"{metric_pop_name}_date", "join_date"], inplace=True + ) + + # For cases where period-over-period change isn't defined, copy over the observed_df values into + ## the forecast_df. Check for values in the forecast period and raise an error if it's filled with + ## nan. + else: + self.forecast_df = self.forecast_df.merge( + self.observed_df[ + [*self.join_columns, self.metric_hub.alias, "submission_date"] + ], + how="left", + on=[*self.join_columns, "submission_date"], + ) + # The forecast data should have no nan values + if ( + self.forecast_df.loc[ + self.forecast_df["submission_date"].isin( + pd.date_range( + pd.to_datetime(self.start_date), + pd.to_datetime(self.end_date), + ) + ), + self.metric_hub.alias, + ] + .isnull() + .sum() + > 0 + ): + raise ValueError("Found nan values in forecast values.") + + # Update the forecast_df with scalar columns + self._add_scalar_columns() + + def _predict(self) -> None: + # Create final scalar as product of individual scalar effects + self.forecast_df["scalar"] = self.forecast_df[ + [c for c in self.forecast_df.columns if "scalar_" in c] + ].prod(axis=1) + + # Calculate forecast as product of scalar value and observed value + self.forecast_df["value"] = ( + self.forecast_df["scalar"] * self.forecast_df[self.metric_hub.alias] + ) + + # Record each scalar value in a dictionary to record in model records + self.forecast_df["forecast_parameters"] = self.forecast_df[ + [c for c in self.forecast_df.columns if "scalar" in c] + ].to_dict(orient="records") + + self.observed_df.rename(columns={self.metric_hub.alias: "value"}, inplace=True) + + def _summarize( + self, + period: str, + ) -> pd.DataFrame: + """ + Calculate summary metrics for `forecast_df` over a given period, and add metadata. + + Args: + period (str): The period for aggregation. + numpy_aggregations (List[str]): List of numpy aggregation functions. + percentiles (List[int]): List of percentiles. + + Returns: + pd.DataFrame: The summarized dataframe. + """ + + df_list = [] + for _, segment_row in self.combination_df.iterrows(): + # find indices in observed_df for rows that exactly match segment dict + segment_historical_indices = ( + self.observed_df[segment_row.index.to_list()] == segment_row + ).all(axis=1) + + segment_forecast_indices = ( + self.forecast_df[segment_row.index.to_list()] == segment_row + ).all(axis=1) + + # aggregate metric to the correct date period (day, month, year) + observed_summarized = pdx.aggregate_to_period( + ( + self.observed_df.loc[ + (segment_historical_indices) + & ( + pd.to_datetime(self.observed_df["submission_date"]) + < self.start_date + ), + ["submission_date", "value"], + ].copy() + ), + period, + ) + forecast_agg = pdx.aggregate_to_period( + ( + self.forecast_df.loc[ + (segment_forecast_indices), + ["submission_date", "value"], + ].copy() + ), + period, + ) + + # find periods of overlap between observed and forecasted data + forecast_with_overlap = forecast_agg.merge( + observed_summarized, + on="submission_date", + how="left", + suffixes=("_forecast", "_observed"), + ).fillna(0) + forecast_with_overlap["value"] = forecast_with_overlap[ + ["value_forecast", "value_observed"] + ].sum(axis=1) + + # add datasource-specific metadata columns + forecast_with_overlap["source"] = "forecast" + observed_summarized["source"] = "historical" + + # create a single dataframe that contains observed and forecasted data + df = pd.concat([observed_summarized, forecast_with_overlap]) + + # add summary metadata columns + df["aggregation_period"] = period.lower() + + # add the expected percentile fields to the df + df["value_low"] = df["value"] + df["value_mid"] = df["value"] + df["value_high"] = df["value"] + + # reorder columns to make interpretation easier + df = df[ + [ + "submission_date", + "aggregation_period", + "source", + "value", + "value_low", + "value_mid", + "value_high", + ] + ] + + # add segment columns to table + for dim, value in zip(segment_row.index, segment_row.values): + df[dim] = value + + # add Metric Hub metadata columns + df["metric_alias"] = self.metric_hub.alias.lower() + df["metric_hub_app_name"] = self.metric_hub.app_name.lower() + df["metric_hub_slug"] = self.metric_hub.slug.lower() + df["metric_start_date"] = pd.to_datetime(self.metric_hub.min_date) + df["metric_end_date"] = pd.to_datetime(self.metric_hub.max_date) + df["metric_collected_at"] = self.collected_at + + # add forecast model metadata columns + df["forecast_start_date"] = self.start_date + df["forecast_end_date"] = self.end_date + df["forecast_trained_at"] = self.trained_at + df["forecast_predicted_at"] = self.predicted_at + + df_list.append(df.copy()) + + return pd.concat(df_list) + + def _add_summary_metadata(self, periods: List[str] | str): + """ + In cases where no summarization is required, adds the expected columns to a summary DataFrame. + """ + if isinstance(periods, list): + if len(periods) > 1: + raise ValueError( + "Can only supply one aggregation period when not summarizing results." + ) + period = periods[0] + + union_cols = [ + "submission_date", + *self.join_columns, + "value", + ] + df = pd.concat([self.forecast_df[union_cols], self.observed_df[union_cols]]) + + df["source"] = np.where( + df["submission_date"] < self.start_date, + "historical", + "forecast", + ) + df["measure"] = np.where( + df["submission_date"] < self.start_date, + "observed", + "forecast", + ) + + df["submission_date"] = pd.to_datetime(df["submission_date"]) + df["aggregation_period"] = period + # add Metric Hub metadata columns + df["metric_alias"] = self.metric_hub.alias.lower() + df["metric_hub_app_name"] = self.metric_hub.app_name.lower() + df["metric_hub_slug"] = self.metric_hub.alias.lower() + df["metric_start_date"] = pd.to_datetime(self.metric_hub.min_date) + df["metric_end_date"] = pd.to_datetime(self.metric_hub.max_date) + df["metric_collected_at"] = self.collected_at + + # add forecast model metadata columns + df["forecast_start_date"] = pd.to_datetime(self.start_date) + df["forecast_end_date"] = self.end_date + df["forecast_trained_at"] = self.trained_at + df["forecast_predicted_at"] = self.predicted_at + + # add other value percentile columns expected from Prophet-based forecasts. Include just the + ## value as these percentiles. Coule be replaced in the future with the option to pass multiple + ## scenarios to scalar forecasts. + df["value_low"] = df["value"] + df["value_mid"] = df["value"] + df["value_high"] = df["value"] + + self.summary_df = df.dropna(subset=["value"]) + + def predict(self) -> None: + """Generate a forecast from `start_date` to `end_date`.""" + print(f"Forecasting from {self.start_date} to {self.end_date}.", flush=True) + self._set_seed() + self.predicted_at = datetime.utcnow() + self._predict() + + def summarize( + self, + requires_summarization: bool = True, + periods: List[str] | str = ["day", "month"], + ) -> None: + """ + There are cases where forecasts created by this class do not require summarization (e.g. the + scalar adjustment was made to a prior forecast) + """ + if not requires_summarization: + self._add_summary_metadata(periods) + + else: + # If summarization is required, use the summarization method + self.summary_df = pd.concat([self._summarize(i) for i in periods]) + + def write_results( + self, + project: str, + dataset: str, + table: str, + write_disposition: str = "WRITE_APPEND", + ) -> None: + """ + Write `self.summary_df` to Big Query. + + Args: + project (str): The Big Query project that the data should be written to. + dataset (str): The Big Query dataset that the data should be written to. + table (str): The Big Query table that the data should be written to. + write_disposition (str, optional): In the event that the destination table exists, + should the table be overwritten ("WRITE_TRUNCATE") or appended to ("WRITE_APPEND")? Defaults to "WRITE_APPEND". + components_table (str, optional): The Big Query table for model components. Defaults to "". + components_dataset (str, optional): The Big Query dataset for model components. Defaults to "". + """ + print( + f"Writing results to `{project}.{dataset}.{table}`.", + flush=True, + ) + client = bigquery.Client(project=project) + schema = [ + bigquery.SchemaField("submission_date", bq_types.DATE), + *[bigquery.SchemaField(k, bq_types.STRING) for k in self.join_columns], + bigquery.SchemaField("aggregation_period", bq_types.STRING), + bigquery.SchemaField("source", bq_types.STRING), + bigquery.SchemaField("value", bq_types.FLOAT), + bigquery.SchemaField("value_low", bq_types.FLOAT), + bigquery.SchemaField("value_mid", bq_types.FLOAT), + bigquery.SchemaField("value_high", bq_types.FLOAT), + bigquery.SchemaField("metric_alias", bq_types.STRING), + bigquery.SchemaField("metric_hub_app_name", bq_types.STRING), + bigquery.SchemaField("metric_hub_slug", bq_types.STRING), + bigquery.SchemaField("metric_start_date", bq_types.DATE), + bigquery.SchemaField("metric_end_date", bq_types.DATE), + bigquery.SchemaField("metric_collected_at", bq_types.TIMESTAMP), + bigquery.SchemaField("forecast_start_date", bq_types.DATE), + bigquery.SchemaField("forecast_end_date", bq_types.DATE), + bigquery.SchemaField("forecast_trained_at", bq_types.TIMESTAMP), + bigquery.SchemaField("forecast_predicted_at", bq_types.TIMESTAMP), + ] + job = client.load_table_from_dataframe( + dataframe=self.summary_df, + destination=f"{project}.{dataset}.{table}", + job_config=bigquery.LoadJobConfig( + schema=schema, + autodetect=False, + write_disposition=write_disposition, + ), + ) + # Wait for the job to complete. + job.result() diff --git a/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py b/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py new file mode 100644 index 00000000..cc24c8a7 --- /dev/null +++ b/jobs/kpi-forecasting/kpi_forecasting/tests/test_scalar_forecast.py @@ -0,0 +1,114 @@ +import pytest +from unittest.mock import patch, MagicMock +import pandas as pd +from kpi_forecasting.models.scalar_forecast import ScalarForecast + + +@pytest.fixture +def setup_forecast(): + observed_df = pd.DataFrame( + { + "submission_date": pd.date_range(start="2020-01-01", periods=6, freq="M"), + "value": [100, 150, 200, 250, 300, 350], + "segment": ["A", "A", "A", "B", "B", "B"], + } + ) + + metric_hub = MagicMock() + metric_hub.slug = "metric_slug" + metric_hub.segments = {"segment": ["A", "B"]} + metric_hub.alias = "metric_alias" + metric_hub.app_name = "app_name" + metric_hub.min_date = "2019-01-01" + metric_hub.max_date = "2020-06-01" + + start_date = "2020-07-01" + end_date = "2020-12-31" + scalar_adjustments = [MagicMock()] + parameters = MagicMock() + parameters.formula = "metric:YOY + metric2:MOM" + + forecast = ScalarForecast( + model_type="scalar", + parameters=parameters, + use_holidays=False, + start_date=start_date, + end_date=end_date, + metric_hub=metric_hub, + ) + + forecast.observed_df = observed_df + forecast.scalar_adjustments = scalar_adjustments + + return forecast + + +def test_post_init(setup_forecast): + forecast = setup_forecast + assert forecast.start_date == "2020-07-01" + assert len(forecast.scalar_adjustments) == 1 + assert list(forecast.combination_df.columns) == ["segment"] + + +def test_period_names_map(setup_forecast): + forecast = setup_forecast + assert forecast.period_names_map == { + "YOY": pd.DateOffset(years=1), + "MOM": pd.DateOffset(months=1), + } + + +def test_parse_formula_for_over_period_changes(setup_forecast): + forecast = setup_forecast + result = forecast._parse_formula_for_over_period_changes() + assert result == {"metric": "YOY", "metric2": "MOM"} + + +def test_add_scalar_columns(setup_forecast): + forecast = setup_forecast + forecast.forecast + _df = forecast.dates_to_predict.merge(forecast.combination_df, how="cross") + forecast._add_scalar_columns() + assert "scalar_mock" in forecast.forecast_df.columns + + +def test_fit(setup_forecast): + forecast = setup_forecast + with patch.object( + forecast, "_parse_formula_for_over_period_changes", return_value=None + ), patch.object(forecast, "_add_scalar_columns"): + forecast._fit() + assert forecast.metric_hub.alias in forecast.forecast_df.columns + assert not forecast.forecast_df[forecast.metric_hub.alias].isnull().any() + + +def test_predict(setup_forecast): + forecast = setup_forecast + with patch.object(forecast, "_set_seed"), patch.object(forecast, "_predict"): + forecast.predict() + assert forecast.predicted_at is not None + + +def test_summarize(setup_forecast): + forecast = setup_forecast + with patch.object( + forecast, "_summarize", return_value=pd.DataFrame() + ), patch.object(forecast, "_add_summary_metadata"): + forecast.summarize(requires_summarization=False) + assert forecast.summary_df is not None + + +@patch("bigquery.Client") +def test_write_results(mock_client, setup_forecast): + forecast = setup_forecast + mock_client_instance = mock_client.return_value + mock_load_job = MagicMock() + mock_client_instance.load_table_from_dataframe.return_value = mock_load_job + mock_load_job.result.return_value = None + + forecast.summary_df = pd.DataFrame( + {"submission_date": ["2020-07-01"], "value": [100]} + ) + + forecast.write_results("project", "dataset", "table") + mock_client_instance.load_table_from_dataframe.assert_called_once()