diff --git a/pyfixest/estimation/models/feglm_.py b/pyfixest/estimation/models/feglm_.py index afd7c2328..4357048cf 100644 --- a/pyfixest/estimation/models/feglm_.py +++ b/pyfixest/estimation/models/feglm_.py @@ -108,6 +108,7 @@ def __init__( self._Xbeta = np.empty(0) self._method = "feglm" + self._supports_sensitivity_analysis = False def prepare_model_matrix(self): "Prepare model inputs for estimation." diff --git a/pyfixest/estimation/models/feols_.py b/pyfixest/estimation/models/feols_.py index fe6299edc..90f154368 100644 --- a/pyfixest/estimation/models/feols_.py +++ b/pyfixest/estimation/models/feols_.py @@ -54,6 +54,7 @@ _get_ritest_stats_slow, _plot_ritest_pvalue, ) +from pyfixest.estimation.sensitivity import SensitivityAnalysis from pyfixest.utils.dev_utils import ( DataFrameType, _extract_variable_level, @@ -402,6 +403,7 @@ def _not_implemented_did(*args, **kwargs): self.test_treatment_heterogeneity = _not_implemented_did self.aggregate = _not_implemented_did self.iplot_aggregate = _not_implemented_did + self._supports_sensitivity_analysis = True def prepare_model_matrix(self): "Prepare model matrices for estimation." @@ -2376,6 +2378,14 @@ def update( return beta_n_plus_1 + def sensitivity_analysis(self) -> SensitivityAnalysis: + """Create a sensitivity analysis object for OLS models.""" + if not self._supports_sensitivity_analysis: + raise ValueError( + "Sensitivity Analysis is only supported for OLS methods (Feols)." + ) + return SensitivityAnalysis(self) + def _feols_input_checks(Y: np.ndarray, X: np.ndarray, weights: np.ndarray): """ diff --git a/pyfixest/estimation/sensitivity.py b/pyfixest/estimation/sensitivity.py new file mode 100644 index 000000000..504044037 --- /dev/null +++ b/pyfixest/estimation/sensitivity.py @@ -0,0 +1,645 @@ +from dataclasses import dataclass +from typing import Any, Optional, Union + +import numpy as np +import pandas as pd +from scipy.stats import t + + +@dataclass +class SensitivityAnalysis: + """ + Implements the sensitivity analysis method described in Cinelli and Hazlett (2020): "Making Sense of Sensitivity: Extending Omitted Variable Bias". + + This class performs the analysis, creates the benchmarks and supports visualizations and output creation. + + Parameters + ---------- + model: pyfixest.Feols + A fitted `pyfixest` model object (e.g., from `feols()`). + X: str, Optional + The name of the treatment variable to analyze by default. If None, operations requiring a treatment variable must specify it explicitly. + """ + + model: Any + X: Optional[str] = None + + def __post_init__(self): + if not getattr(self.model, "_supports_sensitivity_analysis", False): + raise ValueError( + "Sensitivity analysis is only supported for OLS models (Feols)." + ) + + def partial_r2(self, X: Optional[str] = None) -> Union[float, np.ndarray]: + """ + Calculate the partial R2 for a given variable. + + The partial R2 explains how much of the residual variance of the outcome is explained by the covariate. + + Parameters + ---------- + X: str, Optional + The name of the covariate for which to compute the partial R2. If None, returns partial R2s for all covariates in the model. + + Returns + ------- + float or np.ndarray + The partial R2 value(s). + """ + df = self.model._df_t + names = self.model._coefnames + tstat = self.model._tstat + + if X is None: + return tstat**2 / (tstat**2 + df) + + idx = names.index(X) + return tstat[idx] ** 2 / (tstat[idx] ** 2 + df) + + def partial_f2(self, X: Optional[str] = None) -> Union[float, np.ndarray]: + """ + Compute the partial (Cohen's) f2 for a linear regression model. + + The partial f2 is a measure of effect size (a transformation of the partial R2). + + Parameters + ---------- + X: str, Optional + The name of the covariate for which to compute the partial R2. If None, returns partial R2s for all covariates in the model. + + Returns + ------- + float or np.ndarray + The partial f2 value(s). + """ + df = self.model._df_t + names = self.model._coefnames + tstat = self.model._tstat + + if X is None: + return tstat**2 / df + + idx = names.index(X) + return tstat[idx] ** 2 / df + + def robustness_value( + self, X: Optional[str] = None, q: float = 1, alpha: float = 1.0 + ) -> Union[float, np.ndarray]: + """ + Compute the robustness value (RV) of the regression coefficient. + + The RV describes the minimum strength of association (partial R2) that an unobserved confounder would need to have with both the treatment and the outcome to change the research conclusions (e.g., reduce effect by q% or render it insignificant). + + Parameters + ---------- + X : str, optional + The name of the covariate. + q : float, default 1.0 + The proportion of reduction in the coefficient estimate (e.g., 1.0 = 100% reduction to zero). + alpha : float, default 1.0 + The significance level. If 1.0 (default), computes RV for the point estimate (RV_q). + If < 1.0 (e.g., 0.05), computes RV for statistical significance (RV_qa) + + Returns + ------- + float or np.ndarray + The robustness value(s). + """ + df = self.model._df_t + f2 = self.partial_f2(X=X) + + fq = q * np.sqrt(f2) + f_crit = abs(t.ppf(alpha / 2, df - 1)) / np.sqrt(df - 1) + fqa = fq - f_crit + + rv = np.where(fqa > 0, 0.5 * (np.sqrt(fqa**4 + 4 * fqa**2) - fqa**2), 0.0) + + # check edge cases + edge_case = 1 - (1 / fq**2) + rv = np.where(rv > edge_case, rv, (fq**2 - f_crit**2) / (1 + fq**2)) + + return rv + + def sensitivity_stats( + self, X: Optional[str] = None, q: float = 1, alpha: float = 0.05 + ) -> dict: + """ + Compute the sensitivity statistics for the model. + + Parameters + ---------- + X : str, optional + The name of the covariate. + q : float, default 1.0 + The percent reduction for the Robustness Value. + alpha : float, default 0.05 + The significance level for the Robustness Value. + + Returns + ------- + dict + A dictionary containing: + - 'estimate': Coefficient estimate + - 'se': Standard Error + - 'df': Degrees of Freedom + - 'partial_R2': Partial R2 of the covariate + - 'partial_f2': Partial f2 of the covariate + - 'rv_q': Robustness Value for point estimate + - 'rv_qa': Robustness Value for statistical significance + """ + estimate = self.model._beta_hat + se = self.model._se + df = self.model._df_t + + if X is not None: + idx = self.model._coefnames.index(X) + estimate = estimate[idx] + se = se[idx] + + # compute statistics + r2yd_x = self.partial_r2(X=X) + f2yd_x = self.partial_f2(X=X) + rv_q = self.robustness_value(X=X, q=q, alpha=1) # alpha = 1 makes f_crit = 0 + rv_qa = self.robustness_value(X=X, q=q, alpha=alpha) + + sensitivity_stats_df = { + "estimate": estimate, + "se": se, + "df": df, + "partial_R2": r2yd_x, + "partial_f2": f2yd_x, + "rv_q": rv_q, + "rv_qa": rv_qa, + } + + return sensitivity_stats_df + + def ovb_bounds( + self, + treatment: str, + benchmark_covariates: Union[str, list[str]], + kd: Optional[Union[float, list[float]]] = None, + ky: Optional[Union[float, list[float]]] = None, + alpha: float = 0.05, + adjusted_estimate: bool = True, + bound: str = "partial r2", + ): + """ + Compute bounds on omitted variable bias using observed covariates as benchmarks. + + Parameters + ---------- + treatment : str + The name of the treatment variable. + benchmark_covariates : str or list of str + The names of the observed covariates to use for benchmarking. + kd : float or list of floats, default [1, 2, 3] + The multiplier for the strength of the confounder with the treatment + relative to the benchmark covariate. + ky : float or list of floats, optional + The multiplier for the strength of the confounder with the outcome. + If None, defaults to the same values as `kd`. + alpha : float, default 0.05 + Significance level for computing confidence intervals of the adjusted estimates. + adjusted_estimate : bool, default True + If True, computes the adjusted estimate, SE, t-statistic, and CI. + bound : str, default "partial r2" + The type of bound to compute. Currently only "partial r2" is supported. + + Returns + ------- + pd.DataFrame + A DataFrame containing the sensitivity bounds and (optional) adjusted statistics. + """ + df = self.model._df_t + + if kd is None: + kd = [1, 2, 3] + if ky is None: + ky = kd + + if bound != "partial r2": + raise ValueError("Only partial r2 is implemented as of now.") + + bounds = self._ovb_bounds_partial_r2( + treatment=treatment, benchmark_covariates=benchmark_covariates, kd=kd, ky=ky + ) + + if adjusted_estimate: + bounds["treatment"] = treatment + bounds["adjusted_estimate"] = self.adjusted_estimate( + bounds["r2dz_x"], bounds["r2yz_dx"], treatment=treatment, reduce=True + ) + bounds["adjusted_se"] = self.adjusted_se( + bounds["r2dz_x"], bounds["r2yz_dx"], treatment=treatment + ) + bounds["adjusted_t"] = self.adjusted_t( + bounds["r2dz_x"], + bounds["r2yz_dx"], + treatment=treatment, + reduce=True, + h0=0, + ) + se_multiple = abs(t.ppf(alpha / 2, df)) + bounds["adjusted_lower_CI"] = ( + bounds["adjusted_estimate"] - se_multiple * bounds["adjusted_se"] + ) + bounds["adjusted_upper_CI"] = ( + bounds["adjusted_estimate"] + se_multiple * bounds["adjusted_se"] + ) + + return bounds + + def _ovb_bounds_partial_r2( + self, + treatment: str, + benchmark_covariates: Union[str, list[str]], + kd: Optional[Union[float, list[float]]] = None, + ky: Optional[Union[float, list[float]]] = None, + ): + """ + Compute OVB bounds based on partial R2. + + This function should not be called directly. It is called under the ovb_bounds user facing function. + + Parameters + ---------- + treatment : str + The treatment variable. + benchmark_covariates : str or list + Benchmarks. + kd, ky : float or list + Strength multipliers. + + Returns + ------- + pd.DataFrame + DataFrame with 'r2dz_x' and 'r2yz_dx' columns. + """ + from pyfixest.estimation.api import feols + from pyfixest.estimation.FixestMulti_ import FixestMulti + + model = self.model + if model is None or treatment is None: + raise ValueError( + "ovb_partial_r2 requires a model object and a treatment variable" + ) + if kd is None: + kd = [1, 2, 3] + + data = model._data + X = pd.DataFrame(model._X, columns=model._coefnames) + + if treatment not in X.columns: + raise ValueError( + f"Treatment '{treatment}' not found in model coefficients." + ) + + non_treatment = X.drop(columns=treatment) + covariate_names = [col for col in non_treatment.columns if col != "Intercept"] + covariates = " + ".join(covariate_names) + + formula = f"{treatment} ~ {covariates}" + + treatment_model = feols(formula, data=data) + if isinstance(treatment_model, FixestMulti): + raise TypeError( + "Internal error: expected a single model for treatment regression." + ) + treatment_sens = treatment_model.sensitivity_analysis() + + if isinstance(benchmark_covariates, str): + benchmark_covariates = [benchmark_covariates] + + kd_list = [float(x) for x in kd] if isinstance(kd, list) else [float(kd)] + + if ky is None: + ky_list = kd_list.copy() + elif isinstance(ky, list): + ky_list = [float(x) for x in ky] + else: + ky_list = [float(ky)] + + if len(ky_list) != len(kd_list): + if len(ky_list) == 1: + ky_list = ky_list * len(kd_list) + else: + raise ValueError("ky must have length 1 or the same length as kd.") + + bounds_list = [] + + for b in benchmark_covariates: + r2yxj_dx = self.partial_r2(X=b) + r2dxj_x = treatment_sens.partial_r2(X=b) + + for kd_val, ky_val in zip(kd_list, ky_list): + r2dz_x = kd_val * (r2dxj_x / (1 - r2dxj_x)) + + if r2dz_x >= 1: + raise ValueError( + f"Implied bound on r2dz.x >= 1 for benchmark {b} with kd={kd_val}." + "Impossible scenario. Try a lower kd." + ) + r2zxj_xd = ( + kd_val * (r2dxj_x**2) / ((1 - kd_val * r2dxj_x) * (1 - r2dxj_x)) + ) + + if r2zxj_xd >= 1: + raise ValueError( + f"Impossible kd value for benchmark {b}. Try a lower kd." + ) + + r2yz_dx = ( + ((np.sqrt(ky_val) + np.sqrt(r2zxj_xd)) / np.sqrt(1 - r2zxj_xd)) ** 2 + ) * (r2yxj_dx / (1 - r2yxj_dx)) + + if r2yz_dx > 1: + print( + f"Warning: Implied bound on r2yz.dx > 1 for {b}. Capping at 1." + ) + r2yz_dx = 1.0 + + bounds_list.append( + { + "bound_label": f"{kd_val}x {b}", # Simple label maker + "r2dz_x": r2dz_x, + "r2yz_dx": r2yz_dx, + "benchmark_covariate": b, + "kd": kd_val, + "ky": ky_val, + } + ) + + return pd.DataFrame(bounds_list) + + def bias( + self, + r2dz_x: Union[float, np.ndarray], + r2yz_dx: Union[float, np.ndarray], + treatment: str, + ): + """ + Compute the bias for the partial R2 parametrization. + + Parameters + ---------- + r2dz_x : float or np.ndarray + Partial R2 of confounder with treatment. + r2yz_dx : float or np.ndarray + Partial R2 of confounder with outcome. + treatment : str + The treatment variable. + + Returns + ------- + float or np.ndarray + The estimated bias amount (in units of the coefficient). + """ + df = self.model._df_t + idx = self.model._coefnames.index(treatment) + se = self.model._se[idx] + + r2dz_x, r2yz_dx = np.array(r2dz_x), np.array(r2yz_dx) + bias_factor = np.sqrt((r2yz_dx * r2dz_x) / (1 - r2dz_x)) + + return bias_factor * se * np.sqrt(df) + + def adjusted_estimate( + self, + r2dz_x: Union[float, np.ndarray], + r2yz_dx: Union[float, np.ndarray], + treatment: str, + reduce: bool = True, + ): + """ + Compute the bias-adjusted coefficient estimate. + + Parameters + ---------- + r2dz_x, r2yz_dx : float or np.ndarray + Partial R2 parameters of the confounder. + treatment : str + The treatment variable. + reduce : bool, default True + If True, assumes bias moves the estimate toward zero (conservative). + If False, assumes bias moves estimate away from zero. + + Returns + ------- + float or np.ndarray + The adjusted coefficient. + """ + idx = self.model._coefnames.index(treatment) + estimate = self.model._beta_hat[idx] + + if reduce: + return np.sign(estimate) * ( + abs(estimate) - self.bias(r2dz_x, r2yz_dx, treatment=treatment) + ) + else: + return np.sign(estimate) * ( + abs(estimate) + self.bias(r2dz_x, r2yz_dx, treatment=treatment) + ) + + def adjusted_se( + self, + r2dz_x: Union[float, np.ndarray], + r2yz_dx: Union[float, np.ndarray], + treatment: str, + ): + """ + Compute the bias-adjusted Standard Error estimate. + + Parameters + ---------- + r2dz_x, r2yz_dx : float or np.ndarray + Partial R2 parameters of the confounder. + treatment : str + The treatment variable. + + Returns + ------- + float or np.ndarray + The adjusted standard error. + """ + df = self.model._df_t + idx = self.model._coefnames.index(treatment) + se = self.model._se[idx] + + return np.sqrt((1 - r2yz_dx) / (1 - r2dz_x)) * se * np.sqrt(df / (df - 1)) + + def adjusted_t( + self, + r2dz_x: Union[float, np.ndarray], + r2yz_dx: Union[float, np.ndarray], + treatment: str, + reduce: bool = True, + h0: float = 0, + ): + """ + Compute the bias-adjusted t-statistic. + + Parameters + ---------- + r2dz_x, r2yz_dx : float or np.ndarray + Partial R2 parameters of the confounder. + treatment : str + The treatment variable. + reduce : bool, default True + Whether to reduce the estimate magnitude. + h0 : float, default 0 + The null hypothesis value for the t-test. + + Returns + ------- + float or np.ndarray + The adjusted t-statistic. + """ + new_estimate = self.adjusted_estimate( + r2dz_x, r2yz_dx, treatment=treatment, reduce=reduce + ) + new_se = self.adjusted_se(r2dz_x, r2yz_dx, treatment=treatment) + return (new_estimate - h0) / new_se + + def summary( + self, + treatment: Optional[str] = None, + benchmark_covariates: Optional[Union[str, list[str]]] = None, + kd: Optional[Union[float, list[float]]] = None, + ky: Optional[Union[float, list[float]]] = None, + q: float = 1, + alpha: float = 0.05, + reduce: bool = True, + decimals: int = 3, + ): + """ + Print a summary of the sensitivity analysis. + + Parameters + ---------- + treatment : str + The name of the treatment variable. If None, defaults to the first variable. + benchmark_covariates : list or str, optional + The list of covariates to use for bounding. If provided, the bounds table is printed. + kd : list, optional + Multipliers for the strength of the confounder with the treatment (default [1, 2, 3]). + ky : list, optional + Multipliers for the strength of the confounder with the outcome (default same as kd). + q : float + The percent reduction in the estimate considered problematic (default 1 = 100%). + alpha : float + Significance level for the Robustness Value (default 0.05). + reduce : bool + Whether the bias reduces the absolute value of the estimate (default True). + decimals : int + Number of decimal places to print. + """ + if treatment is None: + treatment = self.model._coefnames[0] + if kd is None: + kd = [1, 2, 3] + + if ky is None: + ky = kd + + formula = self.model._fml + stats = self.sensitivity_stats(X=treatment, q=q, alpha=alpha) + est = stats["estimate"] + se = stats["se"] + t_stat = est / se + partial_r2 = stats["partial_R2"] + rv_q = stats["rv_q"] + rv_qa = stats["rv_qa"] + + print("Sensitivity Analysis to Unobserved Confounding\n") + print(f"Model Formula: {formula}\n") + + print(f"Null hypothesis: q = {q} and reduce = {reduce} ") + + h0 = (1 - q) * est + print( + f"-- The null hypothesis deemed problematic is H0:tau = {h0:.{decimals}f} \n" + ) + + print(f"Unadjusted Estimates of '{treatment}':") + print(f" Coef. estimate: {est:.{decimals}f}") + print(f" Standard Error: {se:.{decimals}f}") + print(f" t-value: {t_stat:.{decimals}f} \n") + + print("Sensitivity Statistics:") + print(f" Partial R2 of treatment with outcome: {partial_r2:.{decimals}f}") + print(f" Robustness Value, q = {q} : {rv_q:.{decimals}f}") + print(f" Robustness Value, q = {q} alpha = {alpha} : {rv_qa:.{decimals}f} \n") + + if benchmark_covariates is not None: + print("Bounds on omitted variable bias:") + + bounds_df = self.ovb_bounds( + treatment=treatment, + benchmark_covariates=benchmark_covariates, + kd=kd, + ky=ky, + alpha=alpha, + adjusted_estimate=True, + ) + + cols = [ + "bound_label", + "r2dz_x", + "r2yz_dx", + "treatment", + "adjusted_estimate", + "adjusted_se", + "adjusted_t", + "adjusted_lower_CI", + "adjusted_upper_CI", + ] + + cols = [c for c in cols if c in bounds_df.columns] + + print( + bounds_df[cols].to_string( + index=True, + float_format=lambda x: f"{x:.{6}f}" + if abs(x) < 1e-3 + else f"{x:.{decimals}f}", + ) + ) + + def plot( + self, + treatment: str, + plot_type: str = "contour", + sensitivity_of: str = "estimate", + benchmark_covariates: Optional[Union[str, list]] = None, + **kwargs, + ): + """ + Provide the contour and extreme sensitivity plots of the sensitivity analysis results. + + Parameters + ---------- + treatment : str + Name of the treatment variable. + plot_type : str, default "contour" + Either "contour" or "extreme". + sensitivity_of : str, default "estimate" + Either "estimate" or "t-value". + **kwargs + Additional arguments passed to plotting functions. + + """ + from pyfixest.report.visualize import ovb_contour_plot, ovb_extreme_plot + + if plot_type == "contour": + return ovb_contour_plot( + sens=self, treatment=treatment, sensitivity_of=sensitivity_of, **kwargs + ) + elif plot_type == "extreme" and sensitivity_of == "t-value": + raise NotImplementedError( + "Extreme sensitivity plots for t-values not yet implemented." + ) + elif plot_type == "extreme": + return ovb_extreme_plot(sens=self, treatment=treatment, **kwargs) + else: + raise ValueError('plot_type must be "contour" or "extreme"') diff --git a/pyfixest/report/visualize.py b/pyfixest/report/visualize.py index bd7e8caab..3752e34c3 100644 --- a/pyfixest/report/visualize.py +++ b/pyfixest/report/visualize.py @@ -5,6 +5,8 @@ import numpy as np import pandas as pd +from pyfixest.estimation.sensitivity import SensitivityAnalysis + # Make lets-plot an optional dependency try: from lets_plot import ( @@ -889,3 +891,418 @@ def _get_model_df( df_model = df_joint_full return df_model + + +def ovb_contour_plot( + sens: "SensitivityAnalysis", + treatment: str, + sensitivity_of: str = "estimate", + benchmark_covariates: Optional[Union[str, list]] = None, + kd: Union[float, list] = 1, + ky: Optional[Union[float, list]] = None, + r2dz_x: Optional[Union[float, list]] = None, + r2yz_dx: Optional[Union[float, list]] = None, + reduce: bool = True, + estimate_threshold: float = 0, + t_threshold: float = 2, + lim: Optional[float] = None, + lim_y: Optional[float] = None, + col_contour: str = "black", + col_thr_line: str = "red", + label_text: bool = True, + label_bump_x: Optional[float] = None, + label_bump_y: Optional[float] = None, + xlab: Optional[str] = None, + ylab: Optional[str] = None, + plot_margin_fraction: float = 0.05, + round_dig: int = 3, + n_levels: Optional[int] = None, + figsize: tuple = (6, 6), + ax: Optional[plt.Axes] = None, +): + """ + Create contour plots of omitted variable bias for sensitivity analysis. + + See Cinelli and Hazlett (2020) for details. + + Parameters + ---------- + sens : SensitivityAnalysis + A SensitivityAnalysis object. + treatment : str + Name of the treatment variable. + sensitivity_of : str, default "estimate" + Either "estimate" or "t-value". + benchmark_covariates : str or list, optional + Covariate(s) for benchmarking. + kd : float or list, default 1 + Multiplier for treatment-side bounds. + ky : float or list, optional + Multiplier for outcome-side bounds. Defaults to kd. + r2dz_x : float or list, optional + Manual partial R2 values for treatment. + r2yz_dx : float or list, optional + Manual partial R2 values for outcome. + reduce : bool, default True + Whether confounding reduces (True) or increases (False) the estimate. + estimate_threshold : float, default 0 + Threshold for estimate contours. + t_threshold : float, default 2 + Threshold for t-value contours. + lim : float, optional + X-axis maximum. Auto-calculated if None. + lim_y : float, optional + Y-axis maximum. Auto-calculated if None. + col_contour : str, default "black" + Color for contour lines. + col_thr_line : str, default "red" + Color for threshold line. + label_text : bool, default True + Whether to show labels on bounds. + label_bump_x : float, optional + X offset for labels. + label_bump_y : float, optional + Y offset for labels. + xlab : str, optional + X-axis label. + ylab : str, optional + Y-axis label. + plot_margin_fraction : float, default 0.05 + Margin fraction for plot edges. + round_dig : int, default 3 + Rounding digits for labels. + n_levels : int, optional + Number of contour levels. + figsize : tuple, default (6, 6) + Figure size. + ax : matplotlib.axes.Axes, optional + Existing axes to plot on. + + Returns + ------- + matplotlib.figure.Figure + """ + if sensitivity_of not in ["estimate", "t-value"]: + raise ValueError("sensitivity_of must be either 'estimate' or 't-value'.") + + if ky is None: + ky = kd + + idx = sens.model._coefnames.index(treatment) + estimate = sens.model._beta_hat[idx] + se = sens.model._se[idx] + + bound_r2dz_x: Optional[np.ndarray] = None + bound_r2yz_dx: Optional[np.ndarray] = None + bound_label: Optional[np.ndarray] = None + bound_value: Optional[np.ndarray] = None + + if benchmark_covariates is not None: + bounds = sens.ovb_bounds( + treatment=treatment, benchmark_covariates=benchmark_covariates, kd=kd, ky=ky + ) + bound_r2dz_x = np.asarray(bounds["r2dz_x"].values, dtype=float) + bound_r2yz_dx = np.asarray(bounds["r2yz_dx"].values, dtype=float) + bound_label = bounds["bound_label"].values + bound_value = ( + bounds["adjusted_estimate"].values + if sensitivity_of == "estimate" + else bounds["adjusted_t"].values + ) + + if lim is None: + if bound_r2dz_x is None: + lim = 0.4 + else: + lim = min(np.max(np.append(bound_r2dz_x * 1.2, 0.4)), 1 - 1e-12) + + if lim_y is None: + if bound_r2yz_dx is None: + lim_y = 0.4 + else: + lim_y = min(np.max(np.append(bound_r2yz_dx * 1.2, 0.4)), 1 - 1e-12) + + if lim > 1.0: + lim = 1 - 1e-12 + print("Warning: Contour limit larger than 1 was set to 1.") + elif lim < 0: + lim = 0.4 + print("Warning: Contour limit less than 0 was set to 0.4.") + + if lim_y > 1.0: + lim_y = 1 - 1e-12 + print("Warning: Contour limit larger than 1 was set to 1.") + elif lim_y < 0: + lim_y = 0.4 + print("Warning: Contour limit less than 0 was set to 0.4.") + + if label_bump_x is None: + label_bump_x = lim / 30.0 + if label_bump_y is None: + label_bump_y = lim_y / 30.0 + + threshold = estimate_threshold if sensitivity_of == "estimate" else t_threshold + + grid_values_x = np.arange(0, lim, lim / 400) + grid_values_y = np.arange(0, lim_y, lim_y / 400) + R2DZ, R2YZ = np.meshgrid(grid_values_x, grid_values_y) + + if sensitivity_of == "estimate": + z_axis = sens.adjusted_estimate( + r2dz_x=R2DZ, r2yz_dx=R2YZ, treatment=treatment, reduce=reduce + ) + plot_estimate = estimate + else: + z_axis = sens.adjusted_t( + r2dz_x=R2DZ, + r2yz_dx=R2YZ, + treatment=treatment, + reduce=reduce, + h0=estimate_threshold, + ) + plot_estimate = estimate / se # t-value = estimate / se + + # Handle edge cases (R2DZ >= 1) + z_axis = np.where(R2DZ >= 1, np.nan, z_axis) + + if ax is None: + fig, ax = plt.subplots(figsize=figsize) + else: + fig = ax.get_figure() + + z_min, z_max = np.nanmin(z_axis), np.nanmax(z_axis) + if n_levels: + n_levels = n_levels - 1 + delta = (z_max - z_min) / (n_levels + 1) + levels = [] + current = z_min + while current < z_max: + if not np.isclose(current, threshold, atol=1e-8): + levels.append(current) + current += delta + else: + levels = [ + lvl + for lvl in np.linspace(z_min, z_max, 10) + if not np.isclose(lvl, threshold, atol=1e-8) + ] + + CS = ax.contour( + grid_values_x, + grid_values_y, + z_axis, + colors=col_thr_line, + linewidths=1.0, + linestyles="solid", + levels=levels, + ) + ax.clabel(CS, inline=True, fontsize=8, fmt="%1.3g", colors="gray") + + CS_thr = ax.contour( + grid_values_x, + grid_values_y, + z_axis, + colors=col_thr_line, + linewidths=1.0, + linestyles=[(0, (7, 3))], + levels=[threshold], + ) + ax.clabel(CS_thr, inline=True, fontsize=8, fmt="%1.3g", colors="gray") + + # Unadjusted point + ax.scatter([0], [0], c="k", marker="^") + ax.annotate(f"Unadjusted\n({plot_estimate:.3f})", (label_bump_x, label_bump_y)) + + # Axis labels + if xlab is None: + xlab = r"Partial $R^2$ of confounder(s) with the treatment" + if ylab is None: + ylab = r"Partial $R^2$ of confounder(s) with the outcome" + ax.set_xlabel(xlab) + ax.set_ylabel(ylab) + ax.set_xlim(-(lim / 15.0), lim) + ax.set_ylim(-(lim_y / 15.0), lim_y) + + if ( + bound_r2dz_x is not None + and bound_r2yz_dx is not None + and bound_label is not None + and bound_value is not None + ): + for i in range(len(bound_r2dz_x)): + ax.scatter( + bound_r2dz_x[i], + bound_r2yz_dx[i], + c="red", + marker="D", + edgecolors="black", + ) + if label_text: + value = round(float(bound_value[i]), round_dig) + label = f"{bound_label[i]}\n({value})" + ax.annotate( + label, + ( + bound_r2dz_x[i] + label_bump_x, + bound_r2yz_dx[i] + label_bump_y, + ), + ) + + # Add margin + x0, x1, y0, y1 = ax.axis() + ax.axis( + (x0, x1 + plot_margin_fraction * lim, y0, y1 + plot_margin_fraction * lim_y) + ) + + plt.tight_layout() + plt.close() + + return fig + + +def ovb_extreme_plot( + sens: "SensitivityAnalysis", + treatment: str, + benchmark_covariates: Optional[Union[str, list]] = None, + kd: Union[float, list] = 1, + ky: Optional[Union[float, list]] = None, + r2yz_dx: Optional[list] = None, + reduce: bool = True, + threshold: float = 0, + lim: Optional[float] = None, + lim_y: Optional[float] = None, + xlab: Optional[str] = None, + ylab: Optional[str] = None, + figsize: tuple = (8, 4.8), + ax: Optional[plt.Axes] = None, +): + """ + Extreme scenario plots of omitted variable bias for sensitivity analysis. + + Parameters + ---------- + sens : SensitivityAnalysis + A SensitivityAnalysis object. + treatment : str + Name of the treatment variable. + benchmark_covariates : str or list, optional + Covariate(s) for benchmarking. + kd : float or list, default 1 + Multiplier for treatment-side bounds. + ky : float or list, optional + Multiplier for outcome-side bounds. + r2dz_x : float or list, optional + Manual partial R2 values for treatment axis ticks. + r2yz_dx : list, default [1.0, 0.75, 0.5] + Scenarios for outcome partial R2. + reduce : bool, default True + Whether confounding reduces the estimate. + threshold : float, default 0 + Threshold line for problematic effect. + lim : float, optional + X-axis limit. + lim_y : float, optional + Y-axis limit. + xlab : str, optional + X-axis label. + ylab : str, optional + Y-axis label. + figsize : tuple, default (8, 4.8) + Figure size. + ax : matplotlib.axes.Axes, optional + Existing axes. + + Returns + ------- + matplotlib.figure.Figure + """ + if ky is None: + ky = kd + if r2yz_dx is None: + r2yz_dx = [1.0, 0.75, 0.5] + + r2dz_x_bounds = None + + if benchmark_covariates is not None: + bounds = sens.ovb_bounds( + treatment=treatment, benchmark_covariates=benchmark_covariates, kd=kd, ky=ky + ) + r2dz_x_bounds = bounds["r2dz_x"].values + + if lim is None: + if r2dz_x_bounds is None: + lim = 0.1 + else: + lim = min(np.max(np.append(r2dz_x_bounds * 1.2, 0.1)), 1 - 1e-12) + + if lim > 1.0: + lim = 1 - 1e-12 + elif lim < 0: + lim = 0.4 + + if ax is None: + fig, ax = plt.subplots(figsize=figsize) + else: + fig = ax.get_figure() + + r2d_values = np.arange(0, lim, 0.001) + lim_y1, lim_y2 = None, None + + for i, r2yz in enumerate(r2yz_dx): + # Use existing method + y = sens.adjusted_estimate( + r2dz_x=r2d_values, r2yz_dx=r2yz, treatment=treatment, reduce=reduce + ) + y = np.where(r2d_values >= 1, np.nan, y) + + if i == 0: + ax.plot( + r2d_values, + y, + label=f"{round(r2yz * 100)}%", + linewidth=1.5, + linestyle="solid", + color="black", + ) + ax.axhline(y=threshold, color="r", linestyle="--") + lim_y1 = np.nanmax(y) + np.abs(np.nanmax(y)) / 15 + lim_y2 = np.nanmin(y) - np.abs(np.nanmin(y)) / 15 + + # Add rugs for bounds + if r2dz_x_bounds is not None: + for rug in r2dz_x_bounds: + ax.axvline(x=rug, ymin=0, ymax=0.022, color="r", linewidth=2.5) + else: + ax.plot( + r2d_values, + y, + label=f"{round(r2yz * 100)}%", + linewidth=np.abs(2.1 - 0.5 * i), + linestyle="--", + color="black", + ) + + # Legend and formatting + ax.legend(ncol=len(r2yz_dx), frameon=False) + ax.get_legend().set_title(r"Partial $R^2$ of confounder(s) with the outcome") + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + + # Labels + if xlab is None: + xlab = r"Partial $R^2$ of confounder(s) with the treatment" + if ylab is None: + ylab = "Adjusted effect estimate" + ax.set_xlabel(xlab) + ax.set_ylabel(ylab) + ax.set_xlim(-(lim / 35.0), lim + (lim / 35.0)) + + if lim_y is None: + ax.set_ylim(lim_y2, lim_y1) + else: + ax.set_ylim(-(lim_y / 15.0), lim_y) + + plt.tight_layout() + plt.close() + + return fig diff --git a/tests/test_sensitivity.py b/tests/test_sensitivity.py new file mode 100644 index 000000000..42d2ff822 --- /dev/null +++ b/tests/test_sensitivity.py @@ -0,0 +1,350 @@ +import matplotlib +import numpy as np +import pandas as pd +import pytest + +from pyfixest.estimation import feols, fepois +from pyfixest.utils.utils import get_data + +matplotlib.use("Agg") # Use non-interactive backend for testing +import matplotlib.pyplot as plt + + +def test_partial_measures(): + """Test Partial R2 and Partial f2 calculations.""" + data = get_data() + # Create a model: Y ~ X1 + X2 + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() # Or SensitivityAnalysis(fit) + + # 1. Scalar Case (Specific Variable) + r2_x1 = sens.partial_r2("X1") + f2_x1 = sens.partial_f2("X1") + + assert isinstance(r2_x1, (float, np.float64)) + assert 0 <= r2_x1 <= 1 + assert f2_x1 > 0 + # Check consistency: f2 = R2 / (1 - R2) + assert np.isclose(f2_x1, r2_x1 / (1 - r2_x1)) + + # 2. Vector Case (All Variables) + r2_all = sens.partial_r2() + assert len(r2_all) == 3 # Intercept, X1, X2 + assert isinstance(r2_all, np.ndarray) + + +def test_robustness_value_logic(): + """Test Robustness Value (RV) bounds and logic.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + # RV should always be between 0 and 1 + rv = sens.robustness_value("X1", q=1, alpha=0.05) + assert 0 <= rv <= 1 + + # RV with alpha=1 (Point Estimate) should be >= RV with alpha=0.05 (Significance) + rv_q = sens.robustness_value("X1", q=1, alpha=1) + rv_qa = sens.robustness_value("X1", q=1, alpha=0.05) + assert rv_q >= rv_qa + + +def test_sensitivity_stats(): + """Test the main dictionary summary function.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + # Case 1: Specific variable (Scalar output) + stats_single = sens.sensitivity_stats(X="X1") + assert isinstance(stats_single, dict) + required_keys = ["estimate", "se", "partial_R2", "rv_q", "rv_qa"] + for key in required_keys: + assert key in stats_single + + # Case 2: All variables (Vector output) + stats_all = sens.sensitivity_stats(X=None) + assert isinstance(stats_all, dict) + assert len(stats_all["estimate"]) == len(fit._coefnames) + # Ensure RV is calculated for all + assert len(stats_all["rv_q"]) == len(fit._coefnames) + + +def test_ovb_bounds_structure(): + """Test if ovb_bounds returns the correct DataFrame structure.""" + data = get_data() + # Model: Y ~ X1 + X2 + # We will treat X1 as treatment, X2 as the benchmark + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + bounds = sens.ovb_bounds( + treatment="X1", benchmark_covariates="X2", kd=[1, 2], ky=[1, 2] + ) + + # 1. Check Return Type + assert isinstance(bounds, pd.DataFrame) + + # 2. Check Dimensions + # We asked for kd=[1,2] for 1 benchmark ("X2"). Should be 2 rows. + assert len(bounds) == 2 + + # 3. Check Columns + expected_cols = [ + "bound_label", + "r2dz_x", + "r2yz_dx", + "treatment", + "adjusted_estimate", + "adjusted_se", + "adjusted_t", + "adjusted_lower_CI", + "adjusted_upper_CI", + ] + for col in expected_cols: + assert col in bounds.columns + + +def test_ovb_bounds_adjustment_logic(): + """Test if the adjusted estimates move in the expected direction.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + # Get original estimate for X1 + original_est = fit.coef()["X1"] + + # Calculate bounds with reduce=True (Default) + # This should shrink the estimate towards zero + bounds = sens.ovb_bounds(treatment="X1", benchmark_covariates="X2", kd=1) + + adj_est = bounds["adjusted_estimate"].iloc[0] + + # Assert magnitude reduction + assert abs(adj_est) < abs(original_est) + + +def test_summary_smoke_test(capsys): + """ + Smoke test for the summary print method. + Uses capsys to suppress/capture stdout. + """ + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + # Should run without error + sens.summary(treatment="X1", benchmark_covariates="X2") + + # Optional: Check if output contains expected strings + captured = capsys.readouterr() + assert "Sensitivity Analysis to Unobserved Confounding" in captured.out + assert "Bounds on omitted variable bias" in captured.out + assert "1x X2" in captured.out + + +def test_error_handling(): + """Ensure proper errors are raised for invalid inputs.""" + # 1. Setup Data with guaranteed correlation + # We need X1 and X2 to be correlated so that r2dxj_x > 0. + # If r2dxj_x is 0, no kd value will ever trigger the "Impossible" error. + rng = np.random.default_rng(123) + N = 100 + X2 = rng.normal(0, 1, N) + # Make X1 explicitly dependent on X2 (Correlation ~ 0.5) + X1 = 0.5 * X2 + rng.normal(0, 1, N) + Y = X1 + X2 + rng.normal(0, 1, N) + + data = pd.DataFrame({"Y": Y, "X1": X1, "X2": X2}) + + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + # 2. Test Invalid Bound Type + with pytest.raises(ValueError, match="Only partial r2 is implemented"): + sens.ovb_bounds(treatment="X1", benchmark_covariates="X2", bound="invalid_type") + + # 3. Test Impossible kd value + # Since X1 and X2 are correlated, a massive kd will imply R2 > 1 + with pytest.raises(ValueError, match="Impossible scenario"): + sens.ovb_bounds(treatment="X1", benchmark_covariates="X2", kd=1000) + + +def test_sensitivity_analysis_feols_supported(): + """Test that sensitivity analysis works for Feols (OLS) models.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + + # Should not raise + sens = fit.sensitivity_analysis() + assert sens is not None + + +def test_sensitivity_analysis_fepois_not_supported(): + """Test that sensitivity analysis raises error for Poisson models.""" + data = get_data() + data = data.dropna() # Remove NaN values first + data["Y_count"] = np.abs(data["Y"]).astype(int) + 1 + + fit = fepois("Y_count ~ X1 + X2", data=data) + + with pytest.raises(ValueError, match="only supported for OLS"): + fit.sensitivity_analysis() + + +def test_sensitivity_analysis_feiv_not_supported(): + """Test that sensitivity analysis raises error for IV models.""" + data = get_data() + data["Z1"] = data["X1"] + np.random.normal(0, 0.1, len(data)) # Instrument + + fit = feols("Y ~ 1 | X1 ~ Z1", data=data) + + with pytest.raises(ValueError, match="only supported for OLS"): + fit.sensitivity_analysis() + + +@pytest.mark.extended +def test_contour_plot_basic(): + """Smoke test for contour plot with default parameters.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + # Basic contour plot - estimate sensitivity + fig = sens.plot(treatment="X1", plot_type="contour") + assert isinstance(fig, matplotlib.figure.Figure) + plt.close("all") + + +@pytest.mark.extended +def test_contour_plot_with_benchmarks(): + """Test contour plot with benchmark covariates.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + # Contour plot with benchmarks + fig = sens.plot( + treatment="X1", plot_type="contour", benchmark_covariates="X2", kd=[1, 2] + ) + assert isinstance(fig, matplotlib.figure.Figure) + plt.close("all") + + +@pytest.mark.extended +def test_contour_plot_t_value(): + """Test contour plot with t-value sensitivity.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + # Contour plot with t-value sensitivity + fig = sens.plot(treatment="X1", plot_type="contour", sensitivity_of="t-value") + assert isinstance(fig, matplotlib.figure.Figure) + plt.close("all") + + +@pytest.mark.extended +def test_contour_plot_custom_params(): + """Test contour plot with various custom parameters.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + # Contour plot with custom parameters + fig = sens.plot( + treatment="X1", + plot_type="contour", + benchmark_covariates="X2", + kd=[1, 2, 3], + ky=[1, 2, 3], + lim=0.5, + lim_y=0.5, + figsize=(8, 8), + estimate_threshold=0, + t_threshold=1.96, + ) + assert isinstance(fig, matplotlib.figure.Figure) + plt.close("all") + + +@pytest.mark.extended +def test_extreme_plot_basic(): + """Smoke test for extreme plot with default parameters.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + # Basic extreme plot + fig = sens.plot(treatment="X1", plot_type="extreme") + assert isinstance(fig, matplotlib.figure.Figure) + plt.close("all") + + +@pytest.mark.extended +def test_extreme_plot_with_benchmarks(): + """Test extreme plot with benchmark covariates.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + # Extreme plot with benchmarks + fig = sens.plot( + treatment="X1", plot_type="extreme", benchmark_covariates="X2", kd=[1, 2] + ) + assert isinstance(fig, matplotlib.figure.Figure) + plt.close("all") + + +@pytest.mark.extended +def test_extreme_plot_custom_params(): + """Test extreme plot with custom parameters.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + # Extreme plot with custom r2yz_dx scenarios + fig = sens.plot( + treatment="X1", + plot_type="extreme", + r2yz_dx=[1.0, 0.75, 0.5, 0.25], + figsize=(10, 6), + threshold=0.5, + ) + assert isinstance(fig, matplotlib.figure.Figure) + plt.close("all") + + +@pytest.mark.extended +def test_plot_invalid_type(): + """Test that invalid plot_type raises ValueError.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + with pytest.raises(ValueError, match='plot_type must be "contour" or "extreme"'): + sens.plot(treatment="X1", plot_type="invalid") + + +@pytest.mark.extended +def test_extreme_plot_t_value_not_implemented(): + """Test that extreme plot with t-value raises NotImplementedError.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + with pytest.raises( + NotImplementedError, match="Extreme sensitivity plots for t-values" + ): + sens.plot(treatment="X1", plot_type="extreme", sensitivity_of="t-value") + + +@pytest.mark.extended +def test_contour_plot_invalid_sensitivity_of(): + """Test that invalid sensitivity_of raises ValueError in contour plot.""" + data = get_data() + fit = feols("Y ~ X1 + X2", data=data) + sens = fit.sensitivity_analysis() + + with pytest.raises(ValueError, match="sensitivity_of must be either"): + sens.plot(treatment="X1", plot_type="contour", sensitivity_of="invalid")