From 8a9e62e1b7edfd57780c0b27f77789da04897715 Mon Sep 17 00:00:00 2001 From: Erik Ordentlich Date: Sat, 1 Nov 2025 12:52:20 -0700 Subject: [PATCH 1/3] make linear regression standardization behavior closer to apache spark; avoid on-gpu fitmultiple when param maps standardize (for now) Signed-off-by: Erik Ordentlich --- python/src/spark_rapids_ml/classification.py | 55 +------ python/src/spark_rapids_ml/core.py | 25 ++- python/src/spark_rapids_ml/regression.py | 62 ++++++-- python/src/spark_rapids_ml/utils.py | 116 ++++++++++++++ python/tests/test_linear_model.py | 151 ++++++++++++------- python/tests/test_logistic_regression.py | 96 +++++++----- python/tests/test_tuning.py | 52 ++++++- python/tests/test_utils.py | 43 +++++- 8 files changed, 437 insertions(+), 163 deletions(-) diff --git a/python/src/spark_rapids_ml/classification.py b/python/src/spark_rapids_ml/classification.py index 46b5dae42..7ed244318 100644 --- a/python/src/spark_rapids_ml/classification.py +++ b/python/src/spark_rapids_ml/classification.py @@ -1011,53 +1011,14 @@ def _logistic_regression_fit( # Use cupy to standardize dataset as a workaround to gain better numeric stability standarization_with_cupy = standardization and not is_sparse if standarization_with_cupy is True: - import cupy as cp - - if isinstance(concated, np.ndarray): - concated = cp.array(concated) - elif isinstance(concated, pd.DataFrame): - concated = cp.array(concated.values) - else: - assert isinstance( - concated, cp.ndarray - ), "only numpy array, cupy array, and pandas dataframe are supported when standardization_with_cupy is on" - - mean_partial = concated.sum(axis=0) / pdesc.m - - import json - - from pyspark import BarrierTaskContext - - context = BarrierTaskContext.get() - - def all_gather_then_sum( - cp_array: cp.ndarray, dtype: Union[np.float32, np.float64] - ) -> cp.ndarray: - msgs = context.allGather(json.dumps(cp_array.tolist())) - arrays = [json.loads(p) for p in msgs] - array_sum = np.sum(arrays, axis=0).astype(dtype) - return cp.array(array_sum) - - mean = all_gather_then_sum(mean_partial, concated.dtype) - concated -= mean - - l2 = cp.linalg.norm(concated, ord=2, axis=0) - - var_partial = l2 * l2 / (pdesc.m - 1) - var = all_gather_then_sum(var_partial, concated.dtype) - - assert cp.all( - var >= 0 - ), "numeric instable detected when calculating variance. Got negative variance" - - stddev = cp.sqrt(var) - - stddev_inv = cp.where(stddev != 0, 1.0 / stddev, 1.0) - - if fit_intercept is False: - concated += mean - - concated *= stddev_inv + from .utils import _standardize_dataset + + # TODO: fix for multiple param sweep that change standardization and/or fit intercept (unlikely scenario) since + # data modification effects all params. currently not invoked in these cases by fitMultiple (see fitMultiple) + _tmp_data = [(concated, None, None)] + # this will modify concated in place through _tmp_data + mean, stddev = _standardize_dataset(_tmp_data, pdesc, fit_intercept) + concated = _tmp_data[0][0] def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: if standarization_with_cupy is True: diff --git a/python/src/spark_rapids_ml/core.py b/python/src/spark_rapids_ml/core.py index 09070d43b..96b44e192 100644 --- a/python/src/spark_rapids_ml/core.py +++ b/python/src/spark_rapids_ml/core.py @@ -76,12 +76,15 @@ from .metrics import EvalMetricInfo from .params import _CumlParams from .utils import ( + FitInputType, _ArrayOrder, _configure_memory_resource, _get_gpu_id, _get_spark_session, _is_local, _is_standalone_or_localcluster, + _SingleNpArrayBatchType, + _SinglePdDataFrameBatchType, dtype_to_pyspark_type, get_logger, ) @@ -95,14 +98,6 @@ _CumlParamMap = Dict[str, Any] -_SinglePdDataFrameBatchType = Tuple[ - pd.DataFrame, Optional[pd.DataFrame], Optional[pd.DataFrame] -] -_SingleNpArrayBatchType = Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray]] - -# FitInputType is type of [(feature, label), ...] -FitInputType = Union[List[_SinglePdDataFrameBatchType], List[_SingleNpArrayBatchType]] - # TransformInput type TransformInputType = Union["cudf.DataFrame", np.ndarray] @@ -1063,6 +1058,8 @@ def fitMultiple( using `paramMaps[index]`. `index` values may not be sequential. """ + logger = get_logger(self.__class__) + if self._use_cpu_fallback(): return super().fitMultiple(dataset, paramMaps) @@ -1070,6 +1067,18 @@ def fitMultiple( for paramMap in paramMaps: if self._use_cpu_fallback(paramMap): return super().fitMultiple(dataset, paramMaps) + # standardization and fitIntercept currently may modify the dataset and is done once outside the param loop. + # If either appears in a param map, fall back to regular multiple passfitMultiple. + # TODO: sparse logistic regression does not modify data so ok in that case. Need logic to check dataset to detect that case. + # TODO: implement single pass with either of these by processing param maps with no + # standardization or fitIntercept before those with standardization or fitIntercept. + param_names = [p.name for p in paramMap.keys()] + for unsupported in ["standardization", "fitIntercept"]: + if unsupported in param_names: + logger.warning( + f"{unsupported} in param maps not supported for one pass GPU fitMultiple. Falling back to baseline fitMultiple." + ) + return super().fitMultiple(dataset, paramMaps) # reach here if no cpu fallback estimator = self.copy() diff --git a/python/src/spark_rapids_ml/regression.py b/python/src/spark_rapids_ml/regression.py index 05c8a58d8..bdbd8f893 100644 --- a/python/src/spark_rapids_ml/regression.py +++ b/python/src/spark_rapids_ml/regression.py @@ -191,7 +191,7 @@ def _param_mapping(cls) -> Dict[str, Optional[str]]: "maxIter": "max_iter", "regParam": "alpha", "solver": "solver", - "standardization": "normalize", + "standardization": "normalize", # TODO: standardization is carried out in cupy not cuml so need a new type of param mapped value to indicate that. "tol": "tol", "weightCol": None, } @@ -309,9 +309,9 @@ class LinearRegression( Notes ----- - Results for spark ML and spark rapids ml fit() will currently match in all regularization - cases only if features and labels are standardized in the input dataframe. Otherwise, - they will match only if regParam = 0 or elastNetParam = 1.0 (aka Lasso). + Results for spark ML and spark rapids ml fit() will currently be close in all regularization + cases only if features and labels are standardized in the input dataframe or when standardization is enabled. Otherwise, + they will be close only if regParam = 0 or elastNetParam = 1.0 (aka Lasso). Parameters ---------- @@ -513,6 +513,10 @@ def _get_cuml_fit_func( [FitInputType, Dict[str, Any]], Dict[str, Any], ]: + + standardization = self.getStandardization() + fit_intercept = self.getFitIntercept() + def _linear_regression_fit( dfs: FitInputType, params: Dict[str, Any], @@ -522,6 +526,20 @@ def _linear_regression_fit( params[param_alias.part_sizes], params[param_alias.num_cols] ) + pdesc_labels = PartitionDescriptor.build(params[param_alias.part_sizes], 1) + + if standardization: + from .utils import _standardize_dataset + + # this modifies dfs in place by copying to gpu and standardazing in place on gpu + # TODO: fix for multiple param sweep that change standardization and/or fit intercept (unlikely scenario) since + # data modification effects all params. currently not invoked in these cases by fitMultiple (see fitMultiple) + mean, stddev = _standardize_dataset(dfs, pdesc, fit_intercept) + stddev_label = stddev[-1] + stddev_features = stddev[:-1] + mean_label = mean[-1] + mean_features = mean[:-1] + def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: if init_parameters["alpha"] == 0: # LR @@ -532,7 +550,6 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: supported_params = [ "algorithm", "fit_intercept", - "normalize", "verbose", "copy_X", ] @@ -547,18 +564,19 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: "alpha", "solver", "fit_intercept", - "normalize", "verbose", ] # spark ML normalizes sample portion of objective by the number of examples # but cuml does not for RidgeRegression (l1_ratio=0). Induce similar behavior # to spark ml by scaling up the reg parameter by the number of examples. # With this, spark ML and spark rapids ML results match closely when features - # and label columns are all standardized. + # and label columns are all standardized, or when standardization is enabled. init_parameters = init_parameters.copy() if "alpha" in init_parameters.keys(): init_parameters["alpha"] *= (float)(pdesc.m) - + if standardization: + # key to matching mllib when standardization is enabled + init_parameters["alpha"] /= stddev_label else: # LR + L1, or LR + L1 + L2 # Cuml uses Coordinate Descent algorithm to implement Lasso and ElasticNet @@ -575,12 +593,15 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: "l1_ratio", "fit_intercept", "max_iter", - "normalize", "tol", "shuffle", "verbose", ] + if standardization: + # key to matching mllib when standardization is enabled + init_parameters["alpha"] /= stddev_label + # filter only supported params final_init_parameters = { k: v for k, v in init_parameters.items() if k in supported_params @@ -604,9 +625,28 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: pdesc.rank, ) + coef_ = linear_regression.coef_ + intercept_ = linear_regression.intercept_ + + if standardization is True: + import cupy as cp + + coef_ = cp.where( + stddev_features > 0, + (coef_ / stddev_features) * stddev_label, + coef_, + ) + if init_parameters["fit_intercept"] is True: + + intercept_ = ( + intercept_ * stddev_label + - cp.dot(coef_, mean_features) + + mean_label + ).tolist() + return { - "coef_": linear_regression.coef_.get().tolist(), - "intercept_": linear_regression.intercept_, + "coef_": coef_.tolist(), + "intercept_": intercept_, "dtype": linear_regression.dtype.name, "n_cols": linear_regression.n_cols, } diff --git a/python/src/spark_rapids_ml/utils.py b/python/src/spark_rapids_ml/utils.py index 6b81c9770..de7ca12c4 100644 --- a/python/src/spark_rapids_ml/utils.py +++ b/python/src/spark_rapids_ml/utils.py @@ -43,6 +43,13 @@ from pyspark.sql.types import ArrayType, FloatType _ArrayOrder = Literal["C", "F"] +_SinglePdDataFrameBatchType = Tuple[ + pd.DataFrame, Optional[pd.DataFrame], Optional[pd.DataFrame] +] +_SingleNpArrayBatchType = Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray]] + +# FitInputType is type of [(feature, label), ...] +FitInputType = Union[List[_SinglePdDataFrameBatchType], List[_SingleNpArrayBatchType]] def _method_names_from_param(spark_param_name: str) -> List[str]: @@ -809,3 +816,112 @@ def getInputOrFeaturesCols(est: Union[Estimator, Transformer]) -> str: else getattr(est, "getInputCol") ) return getter() + + +def _standardize_dataset( + data: FitInputType, pdesc: PartitionDescriptor, fit_intercept: bool +) -> Tuple["cp.ndarray", "cp.ndarray"]: + """Inplace standardize the dataset feature and optionally label columns + + Args: + data: dataset to standardize (including features and label) + pdesc: Partition descriptor + fit_intercept: Whether to fit intercept in calling fit function. + + Returns: + Mean and standard deviation of features and label columns (latter is last element if present) + Modifies data entries by replacing entries with standardized data on gpu. + If data is already on cpu, modifies in place. + """ + import cupy as cp + + mean_partials_labels = ( + cp.zeros(1, dtype=data[0][1].dtype) if data[0][1] is not None else None + ) + mean_partials = [cp.zeros(pdesc.n, dtype=data[0][0].dtype), mean_partials_labels] + for i in range(len(data)): + _data = [] + for j in range(2): + if data[i][j] is not None: + + if isinstance(data[i][j], cp.ndarray): + _data.append(data[i][j]) # type: ignore + elif isinstance(data[i][j], np.ndarray): + _data.append(cp.array(data[i][j])) # type: ignore + elif isinstance(data[i][j], pd.DataFrame) or isinstance( + data[i][j], pd.Series + ): + _data.append(cp.array(data[i][j].values)) # type: ignore + else: + raise ValueError("Unsupported data type: ", type(data[i][j])) + mean_partials[j] += _data[j].sum(axis=0) / pdesc.m # type: ignore + else: + _data.append(None) + data[i] = (_data[0], _data[1], data[i][2]) # type: ignore + + import json + + from pyspark import BarrierTaskContext + + context = BarrierTaskContext.get() + + def all_gather_then_sum( + cp_array: cp.ndarray, dtype: Union[np.float32, np.float64] + ) -> cp.ndarray: + msgs = context.allGather(json.dumps(cp_array.tolist())) + arrays = [json.loads(p) for p in msgs] + array_sum = np.sum(arrays, axis=0).astype(dtype) + return cp.array(array_sum) + + if mean_partials[1] is not None: + mean_partial = cp.concatenate(mean_partials) # type: ignore + else: + mean_partial = mean_partials[0] + mean = all_gather_then_sum(mean_partial, mean_partial.dtype) + + _mean = (mean[:-1], mean[-1]) if mean_partials[1] is not None else (mean, None) + + var_partials_labels = ( + cp.zeros(1, dtype=data[0][1].dtype) if data[0][1] is not None else None + ) + var_partials = [cp.zeros(pdesc.n, dtype=data[0][0].dtype), var_partials_labels] + for i in range(len(data)): + for j in range(2): + if data[i][j] is not None and _mean[j] is not None: + __data = data[i][j] + __data -= _mean[j] # type: ignore + l2 = cp.linalg.norm(__data, ord=2, axis=0) + var_partials[j] += l2 * l2 / (pdesc.m - 1) + + if var_partials[1] is not None: + var_partial = cp.concatenate((var_partials[0], var_partials[1])) + else: + var_partial = var_partials[0] + var = all_gather_then_sum(var_partial, var_partial.dtype) + + assert cp.all( + var >= 0 + ), "numeric instable detected when calculating variance. Got negative variance" + + stddev = cp.sqrt(var) + stddev_inv = cp.where(stddev != 0, 1.0 / stddev, 1.0) + _stddev_inv = ( + (stddev_inv[:-1], stddev_inv[-1]) + if var_partials[1] is not None + else (stddev_inv, None) + ) + + if fit_intercept is False: + for i in range(len(data)): + for j in range(2): + if data[i][j] is not None and _mean[j] is not None: + __data = data[i][j] + __data += _mean[j] # type: ignore + + for i in range(len(data)): + for j in range(2): + if data[i][j] is not None and _stddev_inv[j] is not None: + __data = data[i][j] + __data *= _stddev_inv[j] # type: ignore + + return mean, stddev diff --git a/python/tests/test_linear_model.py b/python/tests/test_linear_model.py index acb40c7ed..9da550873 100644 --- a/python/tests/test_linear_model.py +++ b/python/tests/test_linear_model.py @@ -542,6 +542,25 @@ def test_linear_regression_spark_compat( assert model.transform(df).take(1) == model2.transform(df).take(1) assert model.numFeatures == 2 + lr.setRegParam(2.0) + lr.setMaxIter(200) + lr.setElasticNetParam(0.0) + model = lr.fit(df) + coefficients = model.coefficients.toArray() + expected_coefficients = [92.22569365, 12.84336458] + # TODO: investigate why relatively large tol (2 % - 10 %) is needed to match mllib + # (though much closer than before adding standardization) + assert all(np.isclose(coefficients, expected_coefficients, rtol=0.02)) + assert np.isclose(model.intercept, 1.76595778134947, rtol=0.1) + + lr.setElasticNetParam(0.5) + model = lr.fit(df) + coefficients = model.coefficients.toArray() + expected_coefficients = [91.9070094, 11.23076474] + # TODO: investigate why relatively large tol is needed to match mllib + assert all(np.isclose(coefficients, expected_coefficients, rtol=0.02)) + assert np.isclose(model.intercept, 3.138371491598421, rtol=0.1) + @pytest.mark.parametrize("params_exception", params_exception) def test_fail_run_on_1_col( @@ -597,64 +616,88 @@ def test_lr_fit_multiple_in_single_pass( initial_lr = lr.copy() - param_maps: List[Dict[Param, Any]] = [ - # alpha = 0, LinearRegression - { - lr.tol: 0.00001, - lr.standardization: False, - lr.loss: "squared_loss", - lr.regParam: 0, - lr.elasticNetParam: 0, - lr.fitIntercept: True, - lr.maxIter: 39, - lr.solver: "auto", - }, - # Ridge - { - lr.tol: 0.00002, - lr.standardization: True, - lr.loss: "squared_loss", - lr.regParam: 0.2, - lr.elasticNetParam: 0, - lr.fitIntercept: True, - lr.maxIter: 29, - lr.solver: "auto", - }, - # Lasso - { - lr.tol: 0.00003, - lr.standardization: False, - lr.loss: "squared_loss", - lr.regParam: 0.3, - lr.elasticNetParam: 1, - lr.fitIntercept: True, - lr.maxIter: 59, - lr.solver: "auto", - }, - # ElasticNet - { - lr.tol: 0.00004, - lr.standardization: False, - lr.loss: "squared_loss", - lr.regParam: 0.5, - lr.elasticNetParam: 0.6, - lr.fitIntercept: False, - lr.maxIter: 69, - lr.solver: "auto", - }, + param_maps_list: List[List[Dict[Param, Any]]] = [ + [ + # alpha = 0, LinearRegression + { + lr.tol: 0.00001, + lr.standardization: False, + lr.loss: "squared_loss", + lr.regParam: 0, + lr.elasticNetParam: 0, + lr.fitIntercept: True, + lr.maxIter: 39, + lr.solver: "auto", + }, + # Ridge + { + lr.tol: 0.00002, + lr.standardization: True, + lr.loss: "squared_loss", + lr.regParam: 0.2, + lr.elasticNetParam: 0, + lr.fitIntercept: True, + lr.maxIter: 29, + lr.solver: "auto", + }, + # Lasso + { + lr.tol: 0.00003, + lr.standardization: False, + lr.loss: "squared_loss", + lr.regParam: 0.3, + lr.elasticNetParam: 1, + lr.fitIntercept: True, + lr.maxIter: 59, + lr.solver: "auto", + }, + # ElasticNet + { + lr.tol: 0.00004, + lr.standardization: False, + lr.loss: "squared_loss", + lr.regParam: 0.5, + lr.elasticNetParam: 0.6, + lr.fitIntercept: False, + lr.maxIter: 69, + lr.solver: "auto", + }, + ], + [ + # Ridge + { + lr.tol: 0.00002, + lr.loss: "squared_loss", + lr.regParam: 0.2, + lr.elasticNetParam: 0, + lr.maxIter: 29, + lr.solver: "auto", + }, + # ElasticNet + { + lr.tol: 0.00004, + lr.loss: "squared_loss", + lr.regParam: 0.5, + lr.elasticNetParam: 0.6, + lr.maxIter: 69, + lr.solver: "auto", + }, + ], ] - models = lr.fit(train_df, param_maps) - for i, param_map in enumerate(param_maps): - rf = initial_lr.copy() - single_model = rf.fit(train_df, param_map) + for param_maps in param_maps_list: + models = lr.fit(train_df, param_maps) + + for i, param_map in enumerate(param_maps): + rf = initial_lr.copy() + single_model = rf.fit(train_df, param_map) - assert single_model.coefficients == models[i].coefficients - assert single_model.intercept == models[i].intercept + assert single_model.coefficients == models[i].coefficients + assert single_model.intercept == models[i].intercept - for k, v in param_map.items(): - assert models[i].getOrDefault(k.name) == v - assert single_model.getOrDefault(k.name) == v + for k, v in param_map.items(): + assert models[i].getOrDefault(k.name) == v + assert single_model.getOrDefault(k.name) == v @pytest.mark.parametrize("feature_type", [feature_types.vector]) diff --git a/python/tests/test_logistic_regression.py b/python/tests/test_logistic_regression.py index 8de1223d6..ab84fffda 100644 --- a/python/tests/test_logistic_regression.py +++ b/python/tests/test_logistic_regression.py @@ -778,50 +778,66 @@ def test_lr_fit_multiple_in_single_pass( initial_lr = lr.copy() - param_maps: List[Dict[Param, Any]] = [ - { - lr.tol: 1, - lr.regParam: 0, - lr.fitIntercept: True, - lr.maxIter: 39, - }, - { - lr.tol: 0.01, - lr.regParam: 0.5, - lr.fitIntercept: False, - lr.maxIter: 100, - }, - { - lr.tol: 0.03, - lr.regParam: 0.7, - lr.fitIntercept: True, - lr.maxIter: 29, - }, - { - lr.tol: 0.0003, - lr.regParam: 0.9, - lr.fitIntercept: False, - lr.maxIter: 89, - }, + param_maps_list: List[List[Dict[Param, Any]]] = [ + [ + { + lr.tol: 1, + lr.regParam: 0, + lr.fitIntercept: True, + lr.maxIter: 39, + }, + { + lr.tol: 0.01, + lr.regParam: 0.5, + lr.fitIntercept: False, + lr.maxIter: 100, + }, + { + lr.tol: 0.03, + lr.regParam: 0.7, + lr.fitIntercept: True, + lr.maxIter: 29, + }, + { + lr.tol: 0.0003, + lr.regParam: 0.9, + lr.fitIntercept: False, + lr.maxIter: 89, + }, + ], + [ + { + lr.tol: 0.00001, + lr.regParam: 0, + lr.maxIter: 39, + }, + { + lr.tol: 0.0003, + lr.regParam: 0.9, + lr.maxIter: 89, + }, + ], ] - models = lr.fit(train_df, param_maps) - for i, param_map in enumerate(param_maps): - rf = initial_lr.copy() - single_model = rf.fit(train_df, param_map) + for param_maps in param_maps_list: + models = lr.fit(train_df, param_maps) - assert array_equal( - single_model.coefficients.toArray(), - models[i].coefficients.toArray(), - tolerance, - ) - assert array_equal( - [single_model.intercept], [models[i].intercept], tolerance - ) + for i, param_map in enumerate(param_maps): + rf = initial_lr.copy() + single_model = rf.fit(train_df, param_map) + + assert array_equal( + single_model.coefficients.toArray(), + models[i].coefficients.toArray(), + tolerance, + ) + assert array_equal( + [single_model.intercept], [models[i].intercept], tolerance + ) - for k, v in param_map.items(): - assert models[i].getOrDefault(k.name) == v - assert single_model.getOrDefault(k.name) == v + for k, v in param_map.items(): + assert models[i].getOrDefault(k.name) == v + assert single_model.getOrDefault(k.name) == v @pytest.mark.compat diff --git a/python/tests/test_tuning.py b/python/tests/test_tuning.py index a19e7512d..ec11872d1 100644 --- a/python/tests/test_tuning.py +++ b/python/tests/test_tuning.py @@ -13,14 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from typing import Tuple, Union +from typing import List, Tuple, Union +from unittest.mock import patch import numpy as np import pytest +from _pytest.logging import LogCaptureFixture from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.tuning import CrossValidatorModel, ParamGridBuilder -from spark_rapids_ml.regression import RandomForestRegressor +from spark_rapids_ml.regression import LinearRegression, RandomForestRegressor from spark_rapids_ml.tuning import CrossValidator from .sparksession import CleanSparkSession @@ -99,3 +101,49 @@ def check_cv(cv_est: Union[CrossValidator, CrossValidatorModel]) -> None: assert evaluator.evaluate(cv_model.transform(df)) == evaluator.evaluate( cv_model_loaded.transform(df) ) + + +@pytest.mark.parametrize("data_type", [np.float32]) +@pytest.mark.parametrize("standardization_settings", [[False], []]) +def test_crossvalidator_standardization_fallback( + data_type: np.dtype, standardization_settings: list[bool], caplog: LogCaptureFixture +) -> None: + """Test that CrossValidator falls back to mllib when standardization is in param maps.""" + X, _, y, _ = make_regression_dataset( + datatype=data_type, + nrows=100, + ncols=8, + ) + + with CleanSparkSession() as spark: + df, features_col, label_col = create_pyspark_dataframe( + spark, feature_types.vector, data_type, X, y + ) + assert label_col is not None + + lr = LinearRegression() + lr.setFeaturesCol(features_col) + lr.setLabelCol(label_col) + + evaluator = RegressionEvaluator() + evaluator.setLabelCol(label_col) + + grid_builder = ParamGridBuilder() + if standardization_settings: + grid_builder.addGrid(lr.standardization, standardization_settings) + grid_builder.addGrid(lr.regParam, [0.01, 0.1]) + grid = grid_builder.build() + + cv = CrossValidator( + estimator=lr, + estimatorParamMaps=grid, + evaluator=evaluator, + numFolds=2, + seed=101, + ) + + cv_model = cv.fit(df) + if standardization_settings: + assert "Falling back to baseline fitMultiple" in caplog.text + else: + assert "Falling back to baseline fitMultiple" not in caplog.text diff --git a/python/tests/test_utils.py b/python/tests/test_utils.py index 5c3b56806..70adbf493 100644 --- a/python/tests/test_utils.py +++ b/python/tests/test_utils.py @@ -14,8 +14,9 @@ # limitations under the License. # -from typing import Dict, Iterator, List, Optional, Union +from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Union +import cupy as cp import numpy as np import pandas as pd import pytest @@ -200,3 +201,43 @@ def test_concat_with_reserved_gpu_mem( assert ( "Reserved" in caplog.text and "GB GPU memory for training data" in caplog.text ) + + +def test_standardize_data() -> None: + from spark_rapids_ml.utils import PartitionDescriptor, _standardize_dataset + + from .sparksession import CleanSparkSession + + def test_udf(iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]: + data = [ + ( + np.array([[1.0, 2.0], [3.0, 4.0]] * 100, dtype=np.float32), + np.array([1.0, 2.0] * 100, dtype=np.float32), + None, + ) + ] + pdesc = PartitionDescriptor(200, 2, 0, [(200, 2)]) + mean, stddev = _standardize_dataset(data, pdesc, True) # type: ignore + + assert isinstance(data[0][0], cp.ndarray) + assert isinstance(data[0][1], cp.ndarray) + + # not exact due to unbiased vs. sample variance calculation + assert all(np.isclose(mean.get(), np.array([2.0, 3.0, 1.5]), atol=1e-2)) # type: ignore + assert all(np.isclose(stddev.get(), np.array([1.0, 1.0, 0.5]), atol=1e-2)) # type: ignore + + assert all( + np.isclose( + data[0][0].get().reshape(-1), # type: ignore + np.array([[-1.0, -1.0], [1.0, 1.0]] * 100).reshape(-1), + atol=1e-2, + ) + ) + assert all(np.isclose(data[0][1].get(), np.array([-1.0, 1.0] * 100), atol=1e-2)) # type: ignore + + for df in iter: + yield df + + with CleanSparkSession() as spark: + df = spark.range(1).repartition(1).mapInPandas(test_udf, schema="id int") + df.rdd.barrier().mapPartitions(lambda x: x).collect() From 83c014824a6030d6772182ff7d7fe17c061b9fda Mon Sep 17 00:00:00 2001 From: Erik Ordentlich Date: Sat, 1 Nov 2025 13:34:16 -0700 Subject: [PATCH 2/3] minor fixes Signed-off-by: Erik Ordentlich --- python/benchmark/benchmark/bench_linear_regression.py | 2 +- python/src/spark_rapids_ml/regression.py | 2 +- python/src/spark_rapids_ml/utils.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/benchmark/benchmark/bench_linear_regression.py b/python/benchmark/benchmark/bench_linear_regression.py index d9030d959..1af1fa7b2 100644 --- a/python/benchmark/benchmark/bench_linear_regression.py +++ b/python/benchmark/benchmark/bench_linear_regression.py @@ -115,7 +115,7 @@ def run_once( # note: results for spark ML and spark rapids ml will currently match in all regularization # cases only if features and labels were standardized in the original dataset. Otherwise, - # they will match only if regParam = 0 or elastNetParam = 1.0 (aka Lasso) + # they will match only if regParam = 0 or elastNeticParam = 1.0 (aka Lasso) print( f"RMSE: {rmse}, coefs l1: {coefs_l1}, coefs l2^2: {coefs_l2}, " f"full_objective: {full_objective}, intercept: {model.intercept}" diff --git a/python/src/spark_rapids_ml/regression.py b/python/src/spark_rapids_ml/regression.py index bdbd8f893..4f8e7ad11 100644 --- a/python/src/spark_rapids_ml/regression.py +++ b/python/src/spark_rapids_ml/regression.py @@ -311,7 +311,7 @@ class LinearRegression( ----- Results for spark ML and spark rapids ml fit() will currently be close in all regularization cases only if features and labels are standardized in the input dataframe or when standardization is enabled. Otherwise, - they will be close only if regParam = 0 or elastNetParam = 1.0 (aka Lasso). + they will be close only if regParam = 0 or elastNeticParam = 1.0 (aka Lasso). Parameters ---------- diff --git a/python/src/spark_rapids_ml/utils.py b/python/src/spark_rapids_ml/utils.py index de7ca12c4..7e1ab9237 100644 --- a/python/src/spark_rapids_ml/utils.py +++ b/python/src/spark_rapids_ml/utils.py @@ -831,7 +831,7 @@ def _standardize_dataset( Returns: Mean and standard deviation of features and label columns (latter is last element if present) Modifies data entries by replacing entries with standardized data on gpu. - If data is already on cpu, modifies in place. + If data is already on gpu, modifies in place (i.e. no copy is made). """ import cupy as cp From 45d0ae788abaf158f14436e170462f3652b839d0 Mon Sep 17 00:00:00 2001 From: Erik Ordentlich Date: Tue, 4 Nov 2025 15:31:20 -0800 Subject: [PATCH 3/3] typos and update comment in bench script Signed-off-by: Erik Ordentlich --- python/benchmark/benchmark/bench_linear_regression.py | 4 ++-- python/src/spark_rapids_ml/regression.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/benchmark/benchmark/bench_linear_regression.py b/python/benchmark/benchmark/bench_linear_regression.py index 1af1fa7b2..2b38cdf22 100644 --- a/python/benchmark/benchmark/bench_linear_regression.py +++ b/python/benchmark/benchmark/bench_linear_regression.py @@ -114,8 +114,8 @@ def run_once( ) # note: results for spark ML and spark rapids ml will currently match in all regularization - # cases only if features and labels were standardized in the original dataset. Otherwise, - # they will match only if regParam = 0 or elastNeticParam = 1.0 (aka Lasso) + # cases only if features and labels were standardized in the original dataset or if standardization is enabled. + # Otherwise, they will match only if regParam = 0 or elasticNetParam = 1.0 (aka Lasso) print( f"RMSE: {rmse}, coefs l1: {coefs_l1}, coefs l2^2: {coefs_l2}, " f"full_objective: {full_objective}, intercept: {model.intercept}" diff --git a/python/src/spark_rapids_ml/regression.py b/python/src/spark_rapids_ml/regression.py index 4f8e7ad11..334192634 100644 --- a/python/src/spark_rapids_ml/regression.py +++ b/python/src/spark_rapids_ml/regression.py @@ -311,7 +311,7 @@ class LinearRegression( ----- Results for spark ML and spark rapids ml fit() will currently be close in all regularization cases only if features and labels are standardized in the input dataframe or when standardization is enabled. Otherwise, - they will be close only if regParam = 0 or elastNeticParam = 1.0 (aka Lasso). + they will be close only if regParam = 0 or elasticNetParam = 1.0 (aka Lasso). Parameters ----------