diff --git a/python/benchmark/benchmark/bench_linear_regression.py b/python/benchmark/benchmark/bench_linear_regression.py index d9030d95..2b38cdf2 100644 --- a/python/benchmark/benchmark/bench_linear_regression.py +++ b/python/benchmark/benchmark/bench_linear_regression.py @@ -114,8 +114,8 @@ def run_once( ) # note: results for spark ML and spark rapids ml will currently match in all regularization - # cases only if features and labels were standardized in the original dataset. Otherwise, - # they will match only if regParam = 0 or elastNetParam = 1.0 (aka Lasso) + # cases only if features and labels were standardized in the original dataset or if standardization is enabled. + # Otherwise, they will match only if regParam = 0 or elasticNetParam = 1.0 (aka Lasso) print( f"RMSE: {rmse}, coefs l1: {coefs_l1}, coefs l2^2: {coefs_l2}, " f"full_objective: {full_objective}, intercept: {model.intercept}" diff --git a/python/src/spark_rapids_ml/classification.py b/python/src/spark_rapids_ml/classification.py index 46b5dae4..7ed24431 100644 --- a/python/src/spark_rapids_ml/classification.py +++ b/python/src/spark_rapids_ml/classification.py @@ -1011,53 +1011,14 @@ def _logistic_regression_fit( # Use cupy to standardize dataset as a workaround to gain better numeric stability standarization_with_cupy = standardization and not is_sparse if standarization_with_cupy is True: - import cupy as cp - - if isinstance(concated, np.ndarray): - concated = cp.array(concated) - elif isinstance(concated, pd.DataFrame): - concated = cp.array(concated.values) - else: - assert isinstance( - concated, cp.ndarray - ), "only numpy array, cupy array, and pandas dataframe are supported when standardization_with_cupy is on" - - mean_partial = concated.sum(axis=0) / pdesc.m - - import json - - from pyspark import BarrierTaskContext - - context = BarrierTaskContext.get() - - def all_gather_then_sum( - cp_array: cp.ndarray, dtype: Union[np.float32, np.float64] - ) -> cp.ndarray: - msgs = context.allGather(json.dumps(cp_array.tolist())) - arrays = [json.loads(p) for p in msgs] - array_sum = np.sum(arrays, axis=0).astype(dtype) - return cp.array(array_sum) - - mean = all_gather_then_sum(mean_partial, concated.dtype) - concated -= mean - - l2 = cp.linalg.norm(concated, ord=2, axis=0) - - var_partial = l2 * l2 / (pdesc.m - 1) - var = all_gather_then_sum(var_partial, concated.dtype) - - assert cp.all( - var >= 0 - ), "numeric instable detected when calculating variance. Got negative variance" - - stddev = cp.sqrt(var) - - stddev_inv = cp.where(stddev != 0, 1.0 / stddev, 1.0) - - if fit_intercept is False: - concated += mean - - concated *= stddev_inv + from .utils import _standardize_dataset + + # TODO: fix for multiple param sweep that change standardization and/or fit intercept (unlikely scenario) since + # data modification effects all params. currently not invoked in these cases by fitMultiple (see fitMultiple) + _tmp_data = [(concated, None, None)] + # this will modify concated in place through _tmp_data + mean, stddev = _standardize_dataset(_tmp_data, pdesc, fit_intercept) + concated = _tmp_data[0][0] def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: if standarization_with_cupy is True: diff --git a/python/src/spark_rapids_ml/core.py b/python/src/spark_rapids_ml/core.py index 132c1001..8f4030ee 100644 --- a/python/src/spark_rapids_ml/core.py +++ b/python/src/spark_rapids_ml/core.py @@ -76,12 +76,15 @@ from .metrics import EvalMetricInfo from .params import _CumlParams from .utils import ( + FitInputType, _ArrayOrder, _configure_memory_resource, _get_gpu_id, _get_spark_session, _is_local, _is_standalone_or_localcluster, + _SingleNpArrayBatchType, + _SinglePdDataFrameBatchType, dtype_to_pyspark_type, get_logger, ) @@ -95,14 +98,6 @@ _CumlParamMap = Dict[str, Any] -_SinglePdDataFrameBatchType = Tuple[ - pd.DataFrame, Optional[pd.DataFrame], Optional[pd.DataFrame] -] -_SingleNpArrayBatchType = Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray]] - -# FitInputType is type of [(feature, label), ...] -FitInputType = Union[List[_SinglePdDataFrameBatchType], List[_SingleNpArrayBatchType]] - # TransformInput type TransformInputType = Union["cudf.DataFrame", np.ndarray] @@ -1170,6 +1165,8 @@ def fitMultiple( using `paramMaps[index]`. `index` values may not be sequential. """ + logger = get_logger(self.__class__) + if self._use_cpu_fallback(): return super().fitMultiple(dataset, paramMaps) @@ -1177,6 +1174,18 @@ def fitMultiple( for paramMap in paramMaps: if self._use_cpu_fallback(paramMap): return super().fitMultiple(dataset, paramMaps) + # standardization and fitIntercept currently may modify the dataset and is done once outside the param loop. + # If either appears in a param map, fall back to regular multiple passfitMultiple. + # TODO: sparse logistic regression does not modify data so ok in that case. Need logic to check dataset to detect that case. + # TODO: implement single pass with either of these by processing param maps with no + # standardization or fitIntercept before those with standardization or fitIntercept. + param_names = [p.name for p in paramMap.keys()] + for unsupported in ["standardization", "fitIntercept"]: + if unsupported in param_names: + logger.warning( + f"{unsupported} in param maps not supported for one pass GPU fitMultiple. Falling back to baseline fitMultiple." + ) + return super().fitMultiple(dataset, paramMaps) # reach here if no cpu fallback estimator = self.copy() diff --git a/python/src/spark_rapids_ml/regression.py b/python/src/spark_rapids_ml/regression.py index 05c8a58d..33419263 100644 --- a/python/src/spark_rapids_ml/regression.py +++ b/python/src/spark_rapids_ml/regression.py @@ -191,7 +191,7 @@ def _param_mapping(cls) -> Dict[str, Optional[str]]: "maxIter": "max_iter", "regParam": "alpha", "solver": "solver", - "standardization": "normalize", + "standardization": "normalize", # TODO: standardization is carried out in cupy not cuml so need a new type of param mapped value to indicate that. "tol": "tol", "weightCol": None, } @@ -309,9 +309,9 @@ class LinearRegression( Notes ----- - Results for spark ML and spark rapids ml fit() will currently match in all regularization - cases only if features and labels are standardized in the input dataframe. Otherwise, - they will match only if regParam = 0 or elastNetParam = 1.0 (aka Lasso). + Results for spark ML and spark rapids ml fit() will currently be close in all regularization + cases only if features and labels are standardized in the input dataframe or when standardization is enabled. Otherwise, + they will be close only if regParam = 0 or elasticNetParam = 1.0 (aka Lasso). Parameters ---------- @@ -513,6 +513,10 @@ def _get_cuml_fit_func( [FitInputType, Dict[str, Any]], Dict[str, Any], ]: + + standardization = self.getStandardization() + fit_intercept = self.getFitIntercept() + def _linear_regression_fit( dfs: FitInputType, params: Dict[str, Any], @@ -522,6 +526,20 @@ def _linear_regression_fit( params[param_alias.part_sizes], params[param_alias.num_cols] ) + pdesc_labels = PartitionDescriptor.build(params[param_alias.part_sizes], 1) + + if standardization: + from .utils import _standardize_dataset + + # this modifies dfs in place by copying to gpu and standardazing in place on gpu + # TODO: fix for multiple param sweep that change standardization and/or fit intercept (unlikely scenario) since + # data modification effects all params. currently not invoked in these cases by fitMultiple (see fitMultiple) + mean, stddev = _standardize_dataset(dfs, pdesc, fit_intercept) + stddev_label = stddev[-1] + stddev_features = stddev[:-1] + mean_label = mean[-1] + mean_features = mean[:-1] + def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: if init_parameters["alpha"] == 0: # LR @@ -532,7 +550,6 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: supported_params = [ "algorithm", "fit_intercept", - "normalize", "verbose", "copy_X", ] @@ -547,18 +564,19 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: "alpha", "solver", "fit_intercept", - "normalize", "verbose", ] # spark ML normalizes sample portion of objective by the number of examples # but cuml does not for RidgeRegression (l1_ratio=0). Induce similar behavior # to spark ml by scaling up the reg parameter by the number of examples. # With this, spark ML and spark rapids ML results match closely when features - # and label columns are all standardized. + # and label columns are all standardized, or when standardization is enabled. init_parameters = init_parameters.copy() if "alpha" in init_parameters.keys(): init_parameters["alpha"] *= (float)(pdesc.m) - + if standardization: + # key to matching mllib when standardization is enabled + init_parameters["alpha"] /= stddev_label else: # LR + L1, or LR + L1 + L2 # Cuml uses Coordinate Descent algorithm to implement Lasso and ElasticNet @@ -575,12 +593,15 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: "l1_ratio", "fit_intercept", "max_iter", - "normalize", "tol", "shuffle", "verbose", ] + if standardization: + # key to matching mllib when standardization is enabled + init_parameters["alpha"] /= stddev_label + # filter only supported params final_init_parameters = { k: v for k, v in init_parameters.items() if k in supported_params @@ -604,9 +625,28 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: pdesc.rank, ) + coef_ = linear_regression.coef_ + intercept_ = linear_regression.intercept_ + + if standardization is True: + import cupy as cp + + coef_ = cp.where( + stddev_features > 0, + (coef_ / stddev_features) * stddev_label, + coef_, + ) + if init_parameters["fit_intercept"] is True: + + intercept_ = ( + intercept_ * stddev_label + - cp.dot(coef_, mean_features) + + mean_label + ).tolist() + return { - "coef_": linear_regression.coef_.get().tolist(), - "intercept_": linear_regression.intercept_, + "coef_": coef_.tolist(), + "intercept_": intercept_, "dtype": linear_regression.dtype.name, "n_cols": linear_regression.n_cols, } diff --git a/python/src/spark_rapids_ml/utils.py b/python/src/spark_rapids_ml/utils.py index 6b81c977..7e1ab923 100644 --- a/python/src/spark_rapids_ml/utils.py +++ b/python/src/spark_rapids_ml/utils.py @@ -43,6 +43,13 @@ from pyspark.sql.types import ArrayType, FloatType _ArrayOrder = Literal["C", "F"] +_SinglePdDataFrameBatchType = Tuple[ + pd.DataFrame, Optional[pd.DataFrame], Optional[pd.DataFrame] +] +_SingleNpArrayBatchType = Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray]] + +# FitInputType is type of [(feature, label), ...] +FitInputType = Union[List[_SinglePdDataFrameBatchType], List[_SingleNpArrayBatchType]] def _method_names_from_param(spark_param_name: str) -> List[str]: @@ -809,3 +816,112 @@ def getInputOrFeaturesCols(est: Union[Estimator, Transformer]) -> str: else getattr(est, "getInputCol") ) return getter() + + +def _standardize_dataset( + data: FitInputType, pdesc: PartitionDescriptor, fit_intercept: bool +) -> Tuple["cp.ndarray", "cp.ndarray"]: + """Inplace standardize the dataset feature and optionally label columns + + Args: + data: dataset to standardize (including features and label) + pdesc: Partition descriptor + fit_intercept: Whether to fit intercept in calling fit function. + + Returns: + Mean and standard deviation of features and label columns (latter is last element if present) + Modifies data entries by replacing entries with standardized data on gpu. + If data is already on gpu, modifies in place (i.e. no copy is made). + """ + import cupy as cp + + mean_partials_labels = ( + cp.zeros(1, dtype=data[0][1].dtype) if data[0][1] is not None else None + ) + mean_partials = [cp.zeros(pdesc.n, dtype=data[0][0].dtype), mean_partials_labels] + for i in range(len(data)): + _data = [] + for j in range(2): + if data[i][j] is not None: + + if isinstance(data[i][j], cp.ndarray): + _data.append(data[i][j]) # type: ignore + elif isinstance(data[i][j], np.ndarray): + _data.append(cp.array(data[i][j])) # type: ignore + elif isinstance(data[i][j], pd.DataFrame) or isinstance( + data[i][j], pd.Series + ): + _data.append(cp.array(data[i][j].values)) # type: ignore + else: + raise ValueError("Unsupported data type: ", type(data[i][j])) + mean_partials[j] += _data[j].sum(axis=0) / pdesc.m # type: ignore + else: + _data.append(None) + data[i] = (_data[0], _data[1], data[i][2]) # type: ignore + + import json + + from pyspark import BarrierTaskContext + + context = BarrierTaskContext.get() + + def all_gather_then_sum( + cp_array: cp.ndarray, dtype: Union[np.float32, np.float64] + ) -> cp.ndarray: + msgs = context.allGather(json.dumps(cp_array.tolist())) + arrays = [json.loads(p) for p in msgs] + array_sum = np.sum(arrays, axis=0).astype(dtype) + return cp.array(array_sum) + + if mean_partials[1] is not None: + mean_partial = cp.concatenate(mean_partials) # type: ignore + else: + mean_partial = mean_partials[0] + mean = all_gather_then_sum(mean_partial, mean_partial.dtype) + + _mean = (mean[:-1], mean[-1]) if mean_partials[1] is not None else (mean, None) + + var_partials_labels = ( + cp.zeros(1, dtype=data[0][1].dtype) if data[0][1] is not None else None + ) + var_partials = [cp.zeros(pdesc.n, dtype=data[0][0].dtype), var_partials_labels] + for i in range(len(data)): + for j in range(2): + if data[i][j] is not None and _mean[j] is not None: + __data = data[i][j] + __data -= _mean[j] # type: ignore + l2 = cp.linalg.norm(__data, ord=2, axis=0) + var_partials[j] += l2 * l2 / (pdesc.m - 1) + + if var_partials[1] is not None: + var_partial = cp.concatenate((var_partials[0], var_partials[1])) + else: + var_partial = var_partials[0] + var = all_gather_then_sum(var_partial, var_partial.dtype) + + assert cp.all( + var >= 0 + ), "numeric instable detected when calculating variance. Got negative variance" + + stddev = cp.sqrt(var) + stddev_inv = cp.where(stddev != 0, 1.0 / stddev, 1.0) + _stddev_inv = ( + (stddev_inv[:-1], stddev_inv[-1]) + if var_partials[1] is not None + else (stddev_inv, None) + ) + + if fit_intercept is False: + for i in range(len(data)): + for j in range(2): + if data[i][j] is not None and _mean[j] is not None: + __data = data[i][j] + __data += _mean[j] # type: ignore + + for i in range(len(data)): + for j in range(2): + if data[i][j] is not None and _stddev_inv[j] is not None: + __data = data[i][j] + __data *= _stddev_inv[j] # type: ignore + + return mean, stddev diff --git a/python/tests/test_linear_model.py b/python/tests/test_linear_model.py index acb40c7e..9da55087 100644 --- a/python/tests/test_linear_model.py +++ b/python/tests/test_linear_model.py @@ -542,6 +542,25 @@ def test_linear_regression_spark_compat( assert model.transform(df).take(1) == model2.transform(df).take(1) assert model.numFeatures == 2 + lr.setRegParam(2.0) + lr.setMaxIter(200) + lr.setElasticNetParam(0.0) + model = lr.fit(df) + coefficients = model.coefficients.toArray() + expected_coefficients = [92.22569365, 12.84336458] + # TODO: investigate why relatively large tol (2 % - 10 %) is needed to match mllib + # (though much closer than before adding standardization) + assert all(np.isclose(coefficients, expected_coefficients, rtol=0.02)) + assert np.isclose(model.intercept, 1.76595778134947, rtol=0.1) + + lr.setElasticNetParam(0.5) + model = lr.fit(df) + coefficients = model.coefficients.toArray() + expected_coefficients = [91.9070094, 11.23076474] + # TODO: investigate why relatively large tol is needed to match mllib + assert all(np.isclose(coefficients, expected_coefficients, rtol=0.02)) + assert np.isclose(model.intercept, 3.138371491598421, rtol=0.1) + @pytest.mark.parametrize("params_exception", params_exception) def test_fail_run_on_1_col( @@ -597,64 +616,88 @@ def test_lr_fit_multiple_in_single_pass( initial_lr = lr.copy() - param_maps: List[Dict[Param, Any]] = [ - # alpha = 0, LinearRegression - { - lr.tol: 0.00001, - lr.standardization: False, - lr.loss: "squared_loss", - lr.regParam: 0, - lr.elasticNetParam: 0, - lr.fitIntercept: True, - lr.maxIter: 39, - lr.solver: "auto", - }, - # Ridge - { - lr.tol: 0.00002, - lr.standardization: True, - lr.loss: "squared_loss", - lr.regParam: 0.2, - lr.elasticNetParam: 0, - lr.fitIntercept: True, - lr.maxIter: 29, - lr.solver: "auto", - }, - # Lasso - { - lr.tol: 0.00003, - lr.standardization: False, - lr.loss: "squared_loss", - lr.regParam: 0.3, - lr.elasticNetParam: 1, - lr.fitIntercept: True, - lr.maxIter: 59, - lr.solver: "auto", - }, - # ElasticNet - { - lr.tol: 0.00004, - lr.standardization: False, - lr.loss: "squared_loss", - lr.regParam: 0.5, - lr.elasticNetParam: 0.6, - lr.fitIntercept: False, - lr.maxIter: 69, - lr.solver: "auto", - }, + param_maps_list: List[List[Dict[Param, Any]]] = [ + [ + # alpha = 0, LinearRegression + { + lr.tol: 0.00001, + lr.standardization: False, + lr.loss: "squared_loss", + lr.regParam: 0, + lr.elasticNetParam: 0, + lr.fitIntercept: True, + lr.maxIter: 39, + lr.solver: "auto", + }, + # Ridge + { + lr.tol: 0.00002, + lr.standardization: True, + lr.loss: "squared_loss", + lr.regParam: 0.2, + lr.elasticNetParam: 0, + lr.fitIntercept: True, + lr.maxIter: 29, + lr.solver: "auto", + }, + # Lasso + { + lr.tol: 0.00003, + lr.standardization: False, + lr.loss: "squared_loss", + lr.regParam: 0.3, + lr.elasticNetParam: 1, + lr.fitIntercept: True, + lr.maxIter: 59, + lr.solver: "auto", + }, + # ElasticNet + { + lr.tol: 0.00004, + lr.standardization: False, + lr.loss: "squared_loss", + lr.regParam: 0.5, + lr.elasticNetParam: 0.6, + lr.fitIntercept: False, + lr.maxIter: 69, + lr.solver: "auto", + }, + ], + [ + # Ridge + { + lr.tol: 0.00002, + lr.loss: "squared_loss", + lr.regParam: 0.2, + lr.elasticNetParam: 0, + lr.maxIter: 29, + lr.solver: "auto", + }, + # ElasticNet + { + lr.tol: 0.00004, + lr.loss: "squared_loss", + lr.regParam: 0.5, + lr.elasticNetParam: 0.6, + lr.maxIter: 69, + lr.solver: "auto", + }, + ], ] - models = lr.fit(train_df, param_maps) - for i, param_map in enumerate(param_maps): - rf = initial_lr.copy() - single_model = rf.fit(train_df, param_map) + for param_maps in param_maps_list: + models = lr.fit(train_df, param_maps) + + for i, param_map in enumerate(param_maps): + rf = initial_lr.copy() + single_model = rf.fit(train_df, param_map) - assert single_model.coefficients == models[i].coefficients - assert single_model.intercept == models[i].intercept + assert single_model.coefficients == models[i].coefficients + assert single_model.intercept == models[i].intercept - for k, v in param_map.items(): - assert models[i].getOrDefault(k.name) == v - assert single_model.getOrDefault(k.name) == v + for k, v in param_map.items(): + assert models[i].getOrDefault(k.name) == v + assert single_model.getOrDefault(k.name) == v @pytest.mark.parametrize("feature_type", [feature_types.vector]) diff --git a/python/tests/test_logistic_regression.py b/python/tests/test_logistic_regression.py index 8de1223d..ab84fffd 100644 --- a/python/tests/test_logistic_regression.py +++ b/python/tests/test_logistic_regression.py @@ -778,50 +778,66 @@ def test_lr_fit_multiple_in_single_pass( initial_lr = lr.copy() - param_maps: List[Dict[Param, Any]] = [ - { - lr.tol: 1, - lr.regParam: 0, - lr.fitIntercept: True, - lr.maxIter: 39, - }, - { - lr.tol: 0.01, - lr.regParam: 0.5, - lr.fitIntercept: False, - lr.maxIter: 100, - }, - { - lr.tol: 0.03, - lr.regParam: 0.7, - lr.fitIntercept: True, - lr.maxIter: 29, - }, - { - lr.tol: 0.0003, - lr.regParam: 0.9, - lr.fitIntercept: False, - lr.maxIter: 89, - }, + param_maps_list: List[List[Dict[Param, Any]]] = [ + [ + { + lr.tol: 1, + lr.regParam: 0, + lr.fitIntercept: True, + lr.maxIter: 39, + }, + { + lr.tol: 0.01, + lr.regParam: 0.5, + lr.fitIntercept: False, + lr.maxIter: 100, + }, + { + lr.tol: 0.03, + lr.regParam: 0.7, + lr.fitIntercept: True, + lr.maxIter: 29, + }, + { + lr.tol: 0.0003, + lr.regParam: 0.9, + lr.fitIntercept: False, + lr.maxIter: 89, + }, + ], + [ + { + lr.tol: 0.00001, + lr.regParam: 0, + lr.maxIter: 39, + }, + { + lr.tol: 0.0003, + lr.regParam: 0.9, + lr.maxIter: 89, + }, + ], ] - models = lr.fit(train_df, param_maps) - for i, param_map in enumerate(param_maps): - rf = initial_lr.copy() - single_model = rf.fit(train_df, param_map) + for param_maps in param_maps_list: + models = lr.fit(train_df, param_maps) - assert array_equal( - single_model.coefficients.toArray(), - models[i].coefficients.toArray(), - tolerance, - ) - assert array_equal( - [single_model.intercept], [models[i].intercept], tolerance - ) + for i, param_map in enumerate(param_maps): + rf = initial_lr.copy() + single_model = rf.fit(train_df, param_map) + + assert array_equal( + single_model.coefficients.toArray(), + models[i].coefficients.toArray(), + tolerance, + ) + assert array_equal( + [single_model.intercept], [models[i].intercept], tolerance + ) - for k, v in param_map.items(): - assert models[i].getOrDefault(k.name) == v - assert single_model.getOrDefault(k.name) == v + for k, v in param_map.items(): + assert models[i].getOrDefault(k.name) == v + assert single_model.getOrDefault(k.name) == v @pytest.mark.compat diff --git a/python/tests/test_tuning.py b/python/tests/test_tuning.py index a19e7512..ec11872d 100644 --- a/python/tests/test_tuning.py +++ b/python/tests/test_tuning.py @@ -13,14 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from typing import Tuple, Union +from typing import List, Tuple, Union +from unittest.mock import patch import numpy as np import pytest +from _pytest.logging import LogCaptureFixture from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.tuning import CrossValidatorModel, ParamGridBuilder -from spark_rapids_ml.regression import RandomForestRegressor +from spark_rapids_ml.regression import LinearRegression, RandomForestRegressor from spark_rapids_ml.tuning import CrossValidator from .sparksession import CleanSparkSession @@ -99,3 +101,49 @@ def check_cv(cv_est: Union[CrossValidator, CrossValidatorModel]) -> None: assert evaluator.evaluate(cv_model.transform(df)) == evaluator.evaluate( cv_model_loaded.transform(df) ) + + +@pytest.mark.parametrize("data_type", [np.float32]) +@pytest.mark.parametrize("standardization_settings", [[False], []]) +def test_crossvalidator_standardization_fallback( + data_type: np.dtype, standardization_settings: list[bool], caplog: LogCaptureFixture +) -> None: + """Test that CrossValidator falls back to mllib when standardization is in param maps.""" + X, _, y, _ = make_regression_dataset( + datatype=data_type, + nrows=100, + ncols=8, + ) + + with CleanSparkSession() as spark: + df, features_col, label_col = create_pyspark_dataframe( + spark, feature_types.vector, data_type, X, y + ) + assert label_col is not None + + lr = LinearRegression() + lr.setFeaturesCol(features_col) + lr.setLabelCol(label_col) + + evaluator = RegressionEvaluator() + evaluator.setLabelCol(label_col) + + grid_builder = ParamGridBuilder() + if standardization_settings: + grid_builder.addGrid(lr.standardization, standardization_settings) + grid_builder.addGrid(lr.regParam, [0.01, 0.1]) + grid = grid_builder.build() + + cv = CrossValidator( + estimator=lr, + estimatorParamMaps=grid, + evaluator=evaluator, + numFolds=2, + seed=101, + ) + + cv_model = cv.fit(df) + if standardization_settings: + assert "Falling back to baseline fitMultiple" in caplog.text + else: + assert "Falling back to baseline fitMultiple" not in caplog.text diff --git a/python/tests/test_utils.py b/python/tests/test_utils.py index 5c3b5680..70adbf49 100644 --- a/python/tests/test_utils.py +++ b/python/tests/test_utils.py @@ -14,8 +14,9 @@ # limitations under the License. # -from typing import Dict, Iterator, List, Optional, Union +from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Union +import cupy as cp import numpy as np import pandas as pd import pytest @@ -200,3 +201,43 @@ def test_concat_with_reserved_gpu_mem( assert ( "Reserved" in caplog.text and "GB GPU memory for training data" in caplog.text ) + + +def test_standardize_data() -> None: + from spark_rapids_ml.utils import PartitionDescriptor, _standardize_dataset + + from .sparksession import CleanSparkSession + + def test_udf(iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]: + data = [ + ( + np.array([[1.0, 2.0], [3.0, 4.0]] * 100, dtype=np.float32), + np.array([1.0, 2.0] * 100, dtype=np.float32), + None, + ) + ] + pdesc = PartitionDescriptor(200, 2, 0, [(200, 2)]) + mean, stddev = _standardize_dataset(data, pdesc, True) # type: ignore + + assert isinstance(data[0][0], cp.ndarray) + assert isinstance(data[0][1], cp.ndarray) + + # not exact due to unbiased vs. sample variance calculation + assert all(np.isclose(mean.get(), np.array([2.0, 3.0, 1.5]), atol=1e-2)) # type: ignore + assert all(np.isclose(stddev.get(), np.array([1.0, 1.0, 0.5]), atol=1e-2)) # type: ignore + + assert all( + np.isclose( + data[0][0].get().reshape(-1), # type: ignore + np.array([[-1.0, -1.0], [1.0, 1.0]] * 100).reshape(-1), + atol=1e-2, + ) + ) + assert all(np.isclose(data[0][1].get(), np.array([-1.0, 1.0] * 100), atol=1e-2)) # type: ignore + + for df in iter: + yield df + + with CleanSparkSession() as spark: + df = spark.range(1).repartition(1).mapInPandas(test_udf, schema="id int") + df.rdd.barrier().mapPartitions(lambda x: x).collect()