From d4683d47ccbe549b2cde7b1035bf68774066f135 Mon Sep 17 00:00:00 2001 From: Erik Ordentlich Date: Mon, 22 Dec 2025 22:07:37 -0800 Subject: [PATCH 1/4] add work around for memadvise cupy bug for cuda13 Signed-off-by: Erik Ordentlich --- python/src/spark_rapids_ml/clustering.py | 5 ++-- python/src/spark_rapids_ml/core.py | 9 +++---- python/src/spark_rapids_ml/umap.py | 7 +++--- python/src/spark_rapids_ml/utils.py | 31 ++++++++++++++++++++++++ 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/python/src/spark_rapids_ml/clustering.py b/python/src/spark_rapids_ml/clustering.py index c768c62b..67974cdd 100644 --- a/python/src/spark_rapids_ml/clustering.py +++ b/python/src/spark_rapids_ml/clustering.py @@ -60,6 +60,7 @@ _concat_and_free, _configure_memory_resource, _get_spark_session, + _memadvise_cpu, get_logger, java_uid, ) @@ -974,9 +975,7 @@ def _cuml_fit( if cuda_managed_mem_enabled or cuda_system_mem_enabled: # for sam, pin numpy array to host to avoid observed page migration to device during later concatenation if cuda_system_mem_enabled and isinstance(features, np.ndarray): - cp.cuda.runtime.memAdvise( - features.ctypes.data, features.nbytes, 3, -1 - ) + _memadvise_cpu(features.ctypes.data, features.nbytes) features = cp.array(features) inputs.append(features) diff --git a/python/src/spark_rapids_ml/core.py b/python/src/spark_rapids_ml/core.py index 088f6823..11e1f282 100644 --- a/python/src/spark_rapids_ml/core.py +++ b/python/src/spark_rapids_ml/core.py @@ -83,6 +83,7 @@ _get_spark_session, _is_local, _is_standalone_or_localcluster, + _memadvise_cpu, _SingleNpArrayBatchType, _SinglePdDataFrameBatchType, dtype_to_pyspark_type, @@ -922,18 +923,14 @@ def _train_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame: # for sam, pin numpy array to host to avoid observed page migration to device during later concatenation # TODO: revise for non-concatenating algos PCA and LinearRegression which should advise these to device as much as possible if cuda_system_mem_enabled and isinstance(features, np.ndarray): - cp.cuda.runtime.memAdvise( - features.ctypes.data, features.nbytes, 3, -1 - ) + _memadvise_cpu(features.ctypes.data, features.nbytes) features = cp.array(features) # for sam sparse case, pin numpy subarrays to host to avoid observed page migration to device during later concatenation # TODO: revise for non-concatenating algos PCA and LinearRegression if cuda_system_mem_enabled and isinstance(features, csr_matrix): for arr in [features.data, features.indptr, features.indices]: - cp.cuda.runtime.memAdvise( - arr.ctypes.data, arr.nbytes, 3, -1 - ) + _memadvise_cpu(arr.ctypes.data, arr.nbytes) label = pdf[alias.label] if alias.label in pdf.columns else None row_number = ( diff --git a/python/src/spark_rapids_ml/umap.py b/python/src/spark_rapids_ml/umap.py index 9ab47727..d0e8cb15 100644 --- a/python/src/spark_rapids_ml/umap.py +++ b/python/src/spark_rapids_ml/umap.py @@ -96,6 +96,7 @@ _configure_memory_resource, _get_spark_session, _is_local, + _memadvise_cpu, dtype_to_pyspark_type, get_logger, ) @@ -1200,15 +1201,13 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]: ) and not use_sparse_array: # for sam, pin numpy array to host to avoid observed page migration to device during later concatenation if cuda_system_mem_enabled and isinstance(features, np.ndarray): - cp.cuda.runtime.memAdvise( - features.ctypes.data, features.nbytes, 3, -1 - ) + _memadvise_cpu(features.ctypes.data, features.nbytes) features = cp.array(features) # for sam sparse case, pin numpy subarrays to host to avoid observed page migration to device during later concatenation if cuda_system_mem_enabled and isinstance(features, csr_matrix): for arr in [features.data, features.indptr, features.indices]: - cp.cuda.runtime.memAdvise(arr.ctypes.data, arr.nbytes, 3, -1) + _memadvise_cpu(arr.ctypes.data, arr.nbytes) label = pdf[alias.label] if alias.label in pdf.columns else None row_number = ( diff --git a/python/src/spark_rapids_ml/utils.py b/python/src/spark_rapids_ml/utils.py index 1d36ea37..f6595901 100644 --- a/python/src/spark_rapids_ml/utils.py +++ b/python/src/spark_rapids_ml/utils.py @@ -240,6 +240,37 @@ def _configure_memory_resource( import spark_rapids_ml.numpy_allocator +def _memadvise_cpu(data: Any, nbytes: int) -> None: + """ + Advise data referenced by pointer to stay in cpu memory. + For use with SAM to prevent migration of host memory staged partial matrices to remain on cpu during + gpu concatenation. + """ + import cuda + import cupy as cp + from packaging.version import parse + + # latest cupy 13.6 has a bug in the cuda13 version of the memadvise api so use low level + # python bindings from cuda-python for that case. Once the patch is released, we can revert to using the cupy api. + if parse(cuda.bindings.__version__) < parse("13.0.0"): + cp.cuda.runtime.memAdvise(data, nbytes, 3, -1) + else: + from cuda.bindings.runtime import ( + cudaMemoryAdvise, + cudaMemoryLocation, + cudaMemoryLocationType, + ) + + mem_location = cudaMemoryLocation() + mem_location.type = cudaMemoryLocationType.cudaMemoryLocationHost + cuda.bindings.runtime.memAdvise( + data, + nbytes, + cudaMemoryAdvise.cudaMemAdviseSetPreferredLocation, + mem_location, + ) + + def _get_default_params_from_func( func: Callable, unsupported_set: List[str] = [] ) -> Dict[str, Any]: From b3ca4bf390bb27c40e7ad6e9529fbae8cf4ebbcc Mon Sep 17 00:00:00 2001 From: Erik Ordentlich Date: Tue, 23 Dec 2025 14:33:59 -0800 Subject: [PATCH 2/4] syntax corrections Signed-off-by: Erik Ordentlich --- python/src/spark_rapids_ml/utils.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/src/spark_rapids_ml/utils.py b/python/src/spark_rapids_ml/utils.py index f6595901..7bae2ac3 100644 --- a/python/src/spark_rapids_ml/utils.py +++ b/python/src/spark_rapids_ml/utils.py @@ -256,14 +256,14 @@ def _memadvise_cpu(data: Any, nbytes: int) -> None: cp.cuda.runtime.memAdvise(data, nbytes, 3, -1) else: from cuda.bindings.runtime import ( + cudaMemLocation, + cudaMemLocationType, cudaMemoryAdvise, - cudaMemoryLocation, - cudaMemoryLocationType, ) - mem_location = cudaMemoryLocation() - mem_location.type = cudaMemoryLocationType.cudaMemoryLocationHost - cuda.bindings.runtime.memAdvise( + mem_location = cudaMemLocation() + mem_location.type = cudaMemLocationType.cudaMemLocationHost + cuda.bindings.runtime.cudaMemAdvise( data, nbytes, cudaMemoryAdvise.cudaMemAdviseSetPreferredLocation, From 0d96a36005d79190862117b175aa0375627809ec Mon Sep 17 00:00:00 2001 From: Erik Ordentlich Date: Tue, 23 Dec 2025 14:39:05 -0800 Subject: [PATCH 3/4] syntax Signed-off-by: Erik Ordentlich --- python/src/spark_rapids_ml/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/spark_rapids_ml/utils.py b/python/src/spark_rapids_ml/utils.py index 7bae2ac3..7bf44d64 100644 --- a/python/src/spark_rapids_ml/utils.py +++ b/python/src/spark_rapids_ml/utils.py @@ -262,7 +262,7 @@ def _memadvise_cpu(data: Any, nbytes: int) -> None: ) mem_location = cudaMemLocation() - mem_location.type = cudaMemLocationType.cudaMemLocationHost + mem_location.type = cudaMemLocationType.cudaMemLocationTypeHost cuda.bindings.runtime.cudaMemAdvise( data, nbytes, From 7cb564dd2d3cb258c4c8d40287a645f16e9c7608 Mon Sep 17 00:00:00 2001 From: Erik Ordentlich Date: Wed, 24 Dec 2025 10:48:33 -0800 Subject: [PATCH 4/4] address comments Signed-off-by: Erik Ordentlich --- python/src/spark_rapids_ml/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/spark_rapids_ml/utils.py b/python/src/spark_rapids_ml/utils.py index 7bf44d64..f8ac301a 100644 --- a/python/src/spark_rapids_ml/utils.py +++ b/python/src/spark_rapids_ml/utils.py @@ -243,7 +243,7 @@ def _configure_memory_resource( def _memadvise_cpu(data: Any, nbytes: int) -> None: """ Advise data referenced by pointer to stay in cpu memory. - For use with SAM to prevent migration of host memory staged partial matrices to remain on cpu during + For use with SAM to prevent migration of partial arrays staged in host memory to device during gpu concatenation. """ import cuda