Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions python/src/spark_rapids_ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
_concat_and_free,
_configure_memory_resource,
_get_spark_session,
_memadvise_cpu,
get_logger,
java_uid,
)
Expand Down Expand Up @@ -974,9 +975,7 @@ def _cuml_fit(
if cuda_managed_mem_enabled or cuda_system_mem_enabled:
# for sam, pin numpy array to host to avoid observed page migration to device during later concatenation
if cuda_system_mem_enabled and isinstance(features, np.ndarray):
cp.cuda.runtime.memAdvise(
features.ctypes.data, features.nbytes, 3, -1
)
_memadvise_cpu(features.ctypes.data, features.nbytes)
features = cp.array(features)

inputs.append(features)
Expand Down
9 changes: 3 additions & 6 deletions python/src/spark_rapids_ml/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
_get_spark_session,
_is_local,
_is_standalone_or_localcluster,
_memadvise_cpu,
_SingleNpArrayBatchType,
_SinglePdDataFrameBatchType,
dtype_to_pyspark_type,
Expand Down Expand Up @@ -922,18 +923,14 @@ def _train_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame:
# for sam, pin numpy array to host to avoid observed page migration to device during later concatenation
# TODO: revise for non-concatenating algos PCA and LinearRegression which should advise these to device as much as possible
if cuda_system_mem_enabled and isinstance(features, np.ndarray):
cp.cuda.runtime.memAdvise(
features.ctypes.data, features.nbytes, 3, -1
)
_memadvise_cpu(features.ctypes.data, features.nbytes)
features = cp.array(features)

# for sam sparse case, pin numpy subarrays to host to avoid observed page migration to device during later concatenation
# TODO: revise for non-concatenating algos PCA and LinearRegression
if cuda_system_mem_enabled and isinstance(features, csr_matrix):
for arr in [features.data, features.indptr, features.indices]:
cp.cuda.runtime.memAdvise(
arr.ctypes.data, arr.nbytes, 3, -1
)
_memadvise_cpu(arr.ctypes.data, arr.nbytes)

label = pdf[alias.label] if alias.label in pdf.columns else None
row_number = (
Expand Down
7 changes: 3 additions & 4 deletions python/src/spark_rapids_ml/umap.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
_configure_memory_resource,
_get_spark_session,
_is_local,
_memadvise_cpu,
dtype_to_pyspark_type,
get_logger,
)
Expand Down Expand Up @@ -1200,15 +1201,13 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
) and not use_sparse_array:
# for sam, pin numpy array to host to avoid observed page migration to device during later concatenation
if cuda_system_mem_enabled and isinstance(features, np.ndarray):
cp.cuda.runtime.memAdvise(
features.ctypes.data, features.nbytes, 3, -1
)
_memadvise_cpu(features.ctypes.data, features.nbytes)
features = cp.array(features)

# for sam sparse case, pin numpy subarrays to host to avoid observed page migration to device during later concatenation
if cuda_system_mem_enabled and isinstance(features, csr_matrix):
for arr in [features.data, features.indptr, features.indices]:
cp.cuda.runtime.memAdvise(arr.ctypes.data, arr.nbytes, 3, -1)
_memadvise_cpu(arr.ctypes.data, arr.nbytes)

label = pdf[alias.label] if alias.label in pdf.columns else None
row_number = (
Expand Down
31 changes: 31 additions & 0 deletions python/src/spark_rapids_ml/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,37 @@ def _configure_memory_resource(
import spark_rapids_ml.numpy_allocator


def _memadvise_cpu(data: Any, nbytes: int) -> None:
"""
Advise data referenced by pointer to stay in cpu memory.
For use with SAM to prevent migration of partial arrays staged in host memory to device during
gpu concatenation.
"""
import cuda
import cupy as cp
from packaging.version import parse

# latest cupy 13.6 has a bug in the cuda13 version of the memadvise api so use low level
# python bindings from cuda-python for that case. Once the patch is released, we can revert to using the cupy api.
if parse(cuda.bindings.__version__) < parse("13.0.0"):
cp.cuda.runtime.memAdvise(data, nbytes, 3, -1)
else:
from cuda.bindings.runtime import (
cudaMemLocation,
cudaMemLocationType,
cudaMemoryAdvise,
)

mem_location = cudaMemLocation()
mem_location.type = cudaMemLocationType.cudaMemLocationTypeHost
cuda.bindings.runtime.cudaMemAdvise(
data,
nbytes,
cudaMemoryAdvise.cudaMemAdviseSetPreferredLocation,
mem_location,
)


def _get_default_params_from_func(
func: Callable, unsupported_set: List[str] = []
) -> Dict[str, Any]:
Expand Down