diff --git a/python/src/spark_rapids_ml/clustering.py b/python/src/spark_rapids_ml/clustering.py index c768c62b..67974cdd 100644 --- a/python/src/spark_rapids_ml/clustering.py +++ b/python/src/spark_rapids_ml/clustering.py @@ -60,6 +60,7 @@ _concat_and_free, _configure_memory_resource, _get_spark_session, + _memadvise_cpu, get_logger, java_uid, ) @@ -974,9 +975,7 @@ def _cuml_fit( if cuda_managed_mem_enabled or cuda_system_mem_enabled: # for sam, pin numpy array to host to avoid observed page migration to device during later concatenation if cuda_system_mem_enabled and isinstance(features, np.ndarray): - cp.cuda.runtime.memAdvise( - features.ctypes.data, features.nbytes, 3, -1 - ) + _memadvise_cpu(features.ctypes.data, features.nbytes) features = cp.array(features) inputs.append(features) diff --git a/python/src/spark_rapids_ml/core.py b/python/src/spark_rapids_ml/core.py index 088f6823..11e1f282 100644 --- a/python/src/spark_rapids_ml/core.py +++ b/python/src/spark_rapids_ml/core.py @@ -83,6 +83,7 @@ _get_spark_session, _is_local, _is_standalone_or_localcluster, + _memadvise_cpu, _SingleNpArrayBatchType, _SinglePdDataFrameBatchType, dtype_to_pyspark_type, @@ -922,18 +923,14 @@ def _train_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame: # for sam, pin numpy array to host to avoid observed page migration to device during later concatenation # TODO: revise for non-concatenating algos PCA and LinearRegression which should advise these to device as much as possible if cuda_system_mem_enabled and isinstance(features, np.ndarray): - cp.cuda.runtime.memAdvise( - features.ctypes.data, features.nbytes, 3, -1 - ) + _memadvise_cpu(features.ctypes.data, features.nbytes) features = cp.array(features) # for sam sparse case, pin numpy subarrays to host to avoid observed page migration to device during later concatenation # TODO: revise for non-concatenating algos PCA and LinearRegression if cuda_system_mem_enabled and isinstance(features, csr_matrix): for arr in [features.data, features.indptr, features.indices]: - cp.cuda.runtime.memAdvise( - arr.ctypes.data, arr.nbytes, 3, -1 - ) + _memadvise_cpu(arr.ctypes.data, arr.nbytes) label = pdf[alias.label] if alias.label in pdf.columns else None row_number = ( diff --git a/python/src/spark_rapids_ml/umap.py b/python/src/spark_rapids_ml/umap.py index 9ab47727..d0e8cb15 100644 --- a/python/src/spark_rapids_ml/umap.py +++ b/python/src/spark_rapids_ml/umap.py @@ -96,6 +96,7 @@ _configure_memory_resource, _get_spark_session, _is_local, + _memadvise_cpu, dtype_to_pyspark_type, get_logger, ) @@ -1200,15 +1201,13 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]: ) and not use_sparse_array: # for sam, pin numpy array to host to avoid observed page migration to device during later concatenation if cuda_system_mem_enabled and isinstance(features, np.ndarray): - cp.cuda.runtime.memAdvise( - features.ctypes.data, features.nbytes, 3, -1 - ) + _memadvise_cpu(features.ctypes.data, features.nbytes) features = cp.array(features) # for sam sparse case, pin numpy subarrays to host to avoid observed page migration to device during later concatenation if cuda_system_mem_enabled and isinstance(features, csr_matrix): for arr in [features.data, features.indptr, features.indices]: - cp.cuda.runtime.memAdvise(arr.ctypes.data, arr.nbytes, 3, -1) + _memadvise_cpu(arr.ctypes.data, arr.nbytes) label = pdf[alias.label] if alias.label in pdf.columns else None row_number = ( diff --git a/python/src/spark_rapids_ml/utils.py b/python/src/spark_rapids_ml/utils.py index 1d36ea37..f8ac301a 100644 --- a/python/src/spark_rapids_ml/utils.py +++ b/python/src/spark_rapids_ml/utils.py @@ -240,6 +240,37 @@ def _configure_memory_resource( import spark_rapids_ml.numpy_allocator +def _memadvise_cpu(data: Any, nbytes: int) -> None: + """ + Advise data referenced by pointer to stay in cpu memory. + For use with SAM to prevent migration of partial arrays staged in host memory to device during + gpu concatenation. + """ + import cuda + import cupy as cp + from packaging.version import parse + + # latest cupy 13.6 has a bug in the cuda13 version of the memadvise api so use low level + # python bindings from cuda-python for that case. Once the patch is released, we can revert to using the cupy api. + if parse(cuda.bindings.__version__) < parse("13.0.0"): + cp.cuda.runtime.memAdvise(data, nbytes, 3, -1) + else: + from cuda.bindings.runtime import ( + cudaMemLocation, + cudaMemLocationType, + cudaMemoryAdvise, + ) + + mem_location = cudaMemLocation() + mem_location.type = cudaMemLocationType.cudaMemLocationTypeHost + cuda.bindings.runtime.cudaMemAdvise( + data, + nbytes, + cudaMemoryAdvise.cudaMemAdviseSetPreferredLocation, + mem_location, + ) + + def _get_default_params_from_func( func: Callable, unsupported_set: List[str] = [] ) -> Dict[str, Any]: