Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,13 @@ def env_set_by_user(key):
LANGUAGE_WORKER_TYPES = ["python", "java", "cpp"]

NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES"
NOSET_XPU_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_XPU_VISIBLE_DEVICES"
RAY_DEVICE_XPU_SELECTOR_ENV_VAR = "ONEAPI_DEVICE_SELECTOR"
RAY_DEVICE_XPU_BACKEND_TYPE = "level_zero"
RAY_DEVICE_XPU_DEVICE_TYPE = "gpu"
RAY_DEVICE_XPU_AS_GPU = True

RAY_DEVICE_TYPES = {"CPU", "CUDA", "XPU"}

# Default max_retries option in @ray.remote for non-actor
# tasks.
Expand Down
99 changes: 72 additions & 27 deletions python/ray/_private/resource_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import ray
import ray._private.ray_constants as ray_constants

try:
import dpctl
except ImportError
pass

try:
import GPUtil
except ImportError:
Expand Down Expand Up @@ -157,35 +162,25 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
# ray._private.state.current_node_id().
resources[NODE_ID_PREFIX + node_ip_address] = 1.0

num_cpus = self.num_cpus
if num_cpus is None:
num_cpus = ray._private.utils.get_num_cpus()
# get cpu num
if "CPU" in RAY_DEVICE_TYPES:
num_cpus = self.num_cpus
if num_cpus is None:
num_cpus = ray._private.utils.get_num_cpus()

num_gpus = self.num_gpus
gpu_ids = ray._private.utils.get_cuda_visible_devices()
# Check that the number of GPUs that the raylet wants doesn't
# exceed the amount allowed by CUDA_VISIBLE_DEVICES.
if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids):
raise ValueError(
"Attempting to start raylet with {} GPUs, "
"but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids)
)
if num_gpus is None:
# Try to automatically detect the number of GPUs.
num_gpus = _autodetect_num_gpus()
# Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES.
if gpu_ids is not None:
num_gpus = min(num_gpus, len(gpu_ids))

try:
if importlib.util.find_spec("GPUtil") is not None:
gpu_types = _get_gpu_types_gputil()
else:
info_string = _get_gpu_info_string()
gpu_types = _constraints_from_gpu_info(info_string)
# get cuda gpu num
if "CUDA" in RAY_DEVICE_TYPES:
num_gpus, gpu_types = _get_cuda_info(self.num_gpus)
resources.update(gpu_types)
except Exception:
logger.exception("Could not parse gpu information.")

# get xpu num
if "XPU" in RAY_DEVICE_TYPES:
if os.environ.get(ray_constants.RAY_DEVICE_XPU_AS_GPU, "True"):
# here we take xpu as gpu, so no need to develop core's scheduling policy
# If we don't want to take xpu as gpu, ray core need to develop new scheduling policy
num_xpus, xpu_types = _get_xpu_info(self.num_gpus)
num_gpus += num_xpus
resource.udpate(xpu_types)

# Choose a default object store size.
system_memory = ray._private.utils.get_system_memory()
Expand Down Expand Up @@ -265,6 +260,56 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
return spec


def _get_cuda_info(num_gpus):
gpu_ids = ray._private.utils.get_cuda_visible_devices()
# Check that the number of GPUs that the raylet wants doesn't
# exceed the amount allowed by CUDA_VISIBLE_DEVICES.
if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids):
raise ValueError(
"Attempting to start raylet with {} GPUs, "
"but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids)
)
if num_gpus is None:
# Try to automatically detect the number of GPUs.
num_gpus = _autodetect_num_gpus()
# Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES.
if gpu_ids is not None:
num_gpus = min(num_gpus, len(gpu_ids))

gpu_types = ""
try:
if importlib.util.find_spec("GPUtil") is not None:
gpu_types = _get_gpu_types_gputil()
else:
info_string = _get_gpu_info_string()
gpu_types = _constraints_from_gpu_info(info_string)
except Exception:
logger.exception("Could not parse gpu information.")

return num_gpus, gpu_types


def _get_xpu_info(num_xpus):
"""Attempt to process the number of XPUs as GPUs

Returns:
The number of XPUs
"""
xpu_ids = ray._private.utils.get_xpu_visible_devices()
if num_xpus is not None and xpu_ids is not None and num_xpus > len(xpu_ids):
raise ValueError(
"Attempting to start raylet with {} XPUs, "
"but XPU_VISIBLE_DEVICES contains {}.".format(num_xpus, xpu_ids)
)
if num_xpus is None:
num_xpus = len(dpctl.get_devices(backend=RAY_DEVICE_XPU_BACKEND_TYPE,
device_type=RAY_DEVICE_XPU_DEVICE_TYPE))
if xpu_ids is not None:
num_xpus = min(num_xpus, len(xpu_ids))
xpu_types = {f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" "xpu": 1}
return num_xpus, xpu_types


def _autodetect_num_gpus():
"""Attempt to detect the number of GPUs on this machine.

Expand Down
33 changes: 33 additions & 0 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,24 @@ def get_cuda_visible_devices():
last_set_gpu_ids = None


def get_xpu_visible_devices():
"""Get the devices IDs in the XPU_VISIBLE_DEVICES environment variable.

Returns:
devices (List[str]): If XPU_VISIBLE_DEVICES is set, return a
list of strings representing the IDs of the visible XPUs.
If it is not set or is set, returns empty list.
"""
xpu_ids_str = os.environ.get("XPU_VISIBLE_DEVICES", None)
if xpu_ids_str is None:
return None

if xpu_ids_str == "":
return []

return list(xpu_ids_str.split(","))


def set_omp_num_threads_if_unset() -> bool:
"""Set the OMP_NUM_THREADS to default to num cpus assigned to the worker

Expand Down Expand Up @@ -384,6 +402,21 @@ def set_cuda_visible_devices(gpu_ids):
last_set_gpu_ids = gpu_ids


def set_xpu_visible_devices(xpu_ids):
"""Set the ONEAPI_DEVICE_SELECTOR environment variable.

Args:
xpu_ids (List[str]): List of strings representing GPU IDs
"""

if os.environ.get(ray_constants.NOSET_XPU_VISIBLE_DEVICES_ENV_VAR):
return

ids_str = ",".join([str(i) for i in xpu_ids])
os.environ["XPU_VISIBLE_DEVICES"] = ids_str
os.environ["ONEAPI_DEVICE_SELECTOR"] = RAY_DEVICE_XPU_BACKEND_TYPE + ":" + ids_str


def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]:
"""Determine a task's resource requirements.

Expand Down
45 changes: 45 additions & 0 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ def __init__(self):
# When the worker is constructed. Record the original value of the
# CUDA_VISIBLE_DEVICES environment variable.
self.original_gpu_ids = ray._private.utils.get_cuda_visible_devices()
# record the original value of the XPU_VISIBLE_DEVICES environment variable.
self.original_xpu_ids = ray._private.utils.get_xpu_visible_devices()
# A dictionary that maps from driver id to SerializationContext
# TODO: clean up the SerializationContext once the job finished.
self.serialization_context_map = {}
Expand Down Expand Up @@ -950,6 +952,49 @@ def get_gpu_ids():
return assigned_ids


@PublicAPI
@client_mode_hook(auto_init=True)
def get_xpu_ids():
""" Get the IDs of the XPUs that are available to the worker.

If the XPU_VISIBLE_DEVICES environment variable was set when the worker
started up,

Returns:
A list of XPU IDs
"""
worker = global_worker
worker.check_connected()

if worker.mode != WORKER_MODE:
if log_once("worker_get_gpu_ids_empty_from_driver"):
logger.warning(
"`ray.get_xpu_ids()` will always return the empty list when "
"called from the driver. This is because Ray does not manage "
"XPU allocations to the driver process."
)
# Here we use `dpctl` to detect XPU device:
# Enumrate all device by API dpctl.get_devices
# Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset
# Or dpctl.get_devices will only return filtered device set by ONEAPI_DEVICE_SELECTOR
# Another method to enumrate XPU device is to use C++ API, maybe can upgrade laster

xpu_devices = dpctl.get_devices(backend=ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE,
device_type=ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE)
xpu_ava_ids = set()
xpu_dev_prefix = f"{ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE}:{ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE}"
for xpu_dev in xpu_devices:
xpu_id = int(xpu_dev.filter_string.split(xpu_dev_prefix)[1])
xpu_ava_ids.add(xpu_id)

xpu_ids = []
if global_worker.original_xpu_ids is not None:
xpu_ids = [
global_worker.original_xpu_ids[xpu_id] for xpu_id in xpu_ava_ids
]

return xpu_ids

@Deprecated(
message="Use ray.get_runtime_context().get_assigned_resources() instead.",
warning=True,
Expand Down
3 changes: 3 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,9 @@ cdef execute_task_with_cancellation_handler(
# Automatically restrict the GPUs available to this task.
ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids())

# Automatically restrict the XPU device avaialable to this task
ray._private.utils.set_xpu_visible_devices(ray.get_xpu_ids())

# Automatically configure OMP_NUM_THREADS to the assigned CPU number.
# It will be unset after the task execution if it was overwridden here.
# No-op if already set.
Expand Down
7 changes: 7 additions & 0 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,13 @@ def ray_start_10_cpus(request, maybe_external_redis):
yield res


@pytest.fixture
def ray_start_1_cpus_6_xpus(request, maybe_external_redis):
param = getattr(request, "param", {})
with _ray_start(num_cpus=1, num_gpus=6, **param) as res:
yield res


@contextmanager
def _ray_start_cluster(**kwargs):
cluster_not_supported_ = kwargs.pop("skip_cluster", cluster_not_supported)
Expand Down
Loading