diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index d3e21046a693..9dffe6dafd16 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -378,6 +378,13 @@ def env_set_by_user(key): LANGUAGE_WORKER_TYPES = ["python", "java", "cpp"] NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES" +NOSET_XPU_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_XPU_VISIBLE_DEVICES" +RAY_DEVICE_XPU_SELECTOR_ENV_VAR = "ONEAPI_DEVICE_SELECTOR" +RAY_DEVICE_XPU_BACKEND_TYPE = "level_zero" +RAY_DEVICE_XPU_DEVICE_TYPE = "gpu" +RAY_DEVICE_XPU_AS_GPU = True + +RAY_DEVICE_TYPES = {"CPU", "CUDA", "XPU"} # Default max_retries option in @ray.remote for non-actor # tasks. diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 21163decf1f8..577b0e76ef3b 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -10,6 +10,11 @@ import ray import ray._private.ray_constants as ray_constants +try: + import dpctl +except ImportError + pass + try: import GPUtil except ImportError: @@ -157,35 +162,25 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): # ray._private.state.current_node_id(). resources[NODE_ID_PREFIX + node_ip_address] = 1.0 - num_cpus = self.num_cpus - if num_cpus is None: - num_cpus = ray._private.utils.get_num_cpus() + # get cpu num + if "CPU" in RAY_DEVICE_TYPES: + num_cpus = self.num_cpus + if num_cpus is None: + num_cpus = ray._private.utils.get_num_cpus() - num_gpus = self.num_gpus - gpu_ids = ray._private.utils.get_cuda_visible_devices() - # Check that the number of GPUs that the raylet wants doesn't - # exceed the amount allowed by CUDA_VISIBLE_DEVICES. - if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids): - raise ValueError( - "Attempting to start raylet with {} GPUs, " - "but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids) - ) - if num_gpus is None: - # Try to automatically detect the number of GPUs. - num_gpus = _autodetect_num_gpus() - # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. - if gpu_ids is not None: - num_gpus = min(num_gpus, len(gpu_ids)) - - try: - if importlib.util.find_spec("GPUtil") is not None: - gpu_types = _get_gpu_types_gputil() - else: - info_string = _get_gpu_info_string() - gpu_types = _constraints_from_gpu_info(info_string) + # get cuda gpu num + if "CUDA" in RAY_DEVICE_TYPES: + num_gpus, gpu_types = _get_cuda_info(self.num_gpus) resources.update(gpu_types) - except Exception: - logger.exception("Could not parse gpu information.") + + # get xpu num + if "XPU" in RAY_DEVICE_TYPES: + if os.environ.get(ray_constants.RAY_DEVICE_XPU_AS_GPU, "True"): + # here we take xpu as gpu, so no need to develop core's scheduling policy + # If we don't want to take xpu as gpu, ray core need to develop new scheduling policy + num_xpus, xpu_types = _get_xpu_info(self.num_gpus) + num_gpus += num_xpus + resource.udpate(xpu_types) # Choose a default object store size. system_memory = ray._private.utils.get_system_memory() @@ -265,6 +260,56 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): return spec +def _get_cuda_info(num_gpus): + gpu_ids = ray._private.utils.get_cuda_visible_devices() + # Check that the number of GPUs that the raylet wants doesn't + # exceed the amount allowed by CUDA_VISIBLE_DEVICES. + if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids): + raise ValueError( + "Attempting to start raylet with {} GPUs, " + "but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids) + ) + if num_gpus is None: + # Try to automatically detect the number of GPUs. + num_gpus = _autodetect_num_gpus() + # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. + if gpu_ids is not None: + num_gpus = min(num_gpus, len(gpu_ids)) + + gpu_types = "" + try: + if importlib.util.find_spec("GPUtil") is not None: + gpu_types = _get_gpu_types_gputil() + else: + info_string = _get_gpu_info_string() + gpu_types = _constraints_from_gpu_info(info_string) + except Exception: + logger.exception("Could not parse gpu information.") + + return num_gpus, gpu_types + + +def _get_xpu_info(num_xpus): + """Attempt to process the number of XPUs as GPUs + + Returns: + The number of XPUs + """ + xpu_ids = ray._private.utils.get_xpu_visible_devices() + if num_xpus is not None and xpu_ids is not None and num_xpus > len(xpu_ids): + raise ValueError( + "Attempting to start raylet with {} XPUs, " + "but XPU_VISIBLE_DEVICES contains {}.".format(num_xpus, xpu_ids) + ) + if num_xpus is None: + num_xpus = len(dpctl.get_devices(backend=RAY_DEVICE_XPU_BACKEND_TYPE, + device_type=RAY_DEVICE_XPU_DEVICE_TYPE)) + if xpu_ids is not None: + num_xpus = min(num_xpus, len(xpu_ids)) + xpu_types = {f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" "xpu": 1} + return num_xpus, xpu_types + + def _autodetect_num_gpus(): """Attempt to detect the number of GPUs on this machine. diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 8f72bd4335e8..0e6f51663e99 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -326,6 +326,24 @@ def get_cuda_visible_devices(): last_set_gpu_ids = None +def get_xpu_visible_devices(): + """Get the devices IDs in the XPU_VISIBLE_DEVICES environment variable. + + Returns: + devices (List[str]): If XPU_VISIBLE_DEVICES is set, return a + list of strings representing the IDs of the visible XPUs. + If it is not set or is set, returns empty list. + """ + xpu_ids_str = os.environ.get("XPU_VISIBLE_DEVICES", None) + if xpu_ids_str is None: + return None + + if xpu_ids_str == "": + return [] + + return list(xpu_ids_str.split(",")) + + def set_omp_num_threads_if_unset() -> bool: """Set the OMP_NUM_THREADS to default to num cpus assigned to the worker @@ -384,6 +402,21 @@ def set_cuda_visible_devices(gpu_ids): last_set_gpu_ids = gpu_ids +def set_xpu_visible_devices(xpu_ids): + """Set the ONEAPI_DEVICE_SELECTOR environment variable. + + Args: + xpu_ids (List[str]): List of strings representing GPU IDs + """ + + if os.environ.get(ray_constants.NOSET_XPU_VISIBLE_DEVICES_ENV_VAR): + return + + ids_str = ",".join([str(i) for i in xpu_ids]) + os.environ["XPU_VISIBLE_DEVICES"] = ids_str + os.environ["ONEAPI_DEVICE_SELECTOR"] = RAY_DEVICE_XPU_BACKEND_TYPE + ":" + ids_str + + def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: """Determine a task's resource requirements. diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 161dafa34fba..d57722d35e2f 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -434,6 +434,8 @@ def __init__(self): # When the worker is constructed. Record the original value of the # CUDA_VISIBLE_DEVICES environment variable. self.original_gpu_ids = ray._private.utils.get_cuda_visible_devices() + # record the original value of the XPU_VISIBLE_DEVICES environment variable. + self.original_xpu_ids = ray._private.utils.get_xpu_visible_devices() # A dictionary that maps from driver id to SerializationContext # TODO: clean up the SerializationContext once the job finished. self.serialization_context_map = {} @@ -950,6 +952,49 @@ def get_gpu_ids(): return assigned_ids +@PublicAPI +@client_mode_hook(auto_init=True) +def get_xpu_ids(): + """ Get the IDs of the XPUs that are available to the worker. + + If the XPU_VISIBLE_DEVICES environment variable was set when the worker + started up, + + Returns: + A list of XPU IDs + """ + worker = global_worker + worker.check_connected() + + if worker.mode != WORKER_MODE: + if log_once("worker_get_gpu_ids_empty_from_driver"): + logger.warning( + "`ray.get_xpu_ids()` will always return the empty list when " + "called from the driver. This is because Ray does not manage " + "XPU allocations to the driver process." + ) + # Here we use `dpctl` to detect XPU device: + # Enumrate all device by API dpctl.get_devices + # Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset + # Or dpctl.get_devices will only return filtered device set by ONEAPI_DEVICE_SELECTOR + # Another method to enumrate XPU device is to use C++ API, maybe can upgrade laster + + xpu_devices = dpctl.get_devices(backend=ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE, + device_type=ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE) + xpu_ava_ids = set() + xpu_dev_prefix = f"{ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE}:{ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE}" + for xpu_dev in xpu_devices: + xpu_id = int(xpu_dev.filter_string.split(xpu_dev_prefix)[1]) + xpu_ava_ids.add(xpu_id) + + xpu_ids = [] + if global_worker.original_xpu_ids is not None: + xpu_ids = [ + global_worker.original_xpu_ids[xpu_id] for xpu_id in xpu_ava_ids + ] + + return xpu_ids + @Deprecated( message="Use ray.get_runtime_context().get_assigned_resources() instead.", warning=True, diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index c34179589cec..76e876296547 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1047,6 +1047,9 @@ cdef execute_task_with_cancellation_handler( # Automatically restrict the GPUs available to this task. ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids()) + # Automatically restrict the XPU device avaialable to this task + ray._private.utils.set_xpu_visible_devices(ray.get_xpu_ids()) + # Automatically configure OMP_NUM_THREADS to the assigned CPU number. # It will be unset after the task execution if it was overwridden here. # No-op if already set. diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index cdc049ca860f..6534e4b2293c 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -354,6 +354,13 @@ def ray_start_10_cpus(request, maybe_external_redis): yield res +@pytest.fixture +def ray_start_1_cpus_6_xpus(request, maybe_external_redis): + param = getattr(request, "param", {}) + with _ray_start(num_cpus=1, num_gpus=6, **param) as res: + yield res + + @contextmanager def _ray_start_cluster(**kwargs): cluster_not_supported_ = kwargs.pop("skip_cluster", cluster_not_supported) diff --git a/python/ray/tests/test_basic_xpu.py b/python/ray/tests/test_basic_xpu.py new file mode 100644 index 000000000000..58ec4c88361f --- /dev/null +++ b/python/ray/tests/test_basic_xpu.py @@ -0,0 +1,193 @@ +# coding: utf-8 +import io +import logging +import os +import subprocess +import sys +import tempfile +import threading +import time +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +import dpctl +import dpctl.program as dpctl_prog + +import torch +import torchvision +import intel_extension_for_pytorch + +from ray._private.ray_constants import KV_NAMESPACE_FUNCTION_TABLE +from ray._private.test_utils import client_test_enabled +from ray.cluster_utils import Cluster, cluster_not_supported +from ray.exceptions import GetTimeoutError, RayTaskError +from ray.tests.client_test_utils import create_remote_signal_actor + +if client_test_enabled(): + from ray.util.client import ray +else: + import ray + +logger = logging.getLogger(__name__) + + +def get_spirv_abspath(fn): + curr_dir = os.path.dirname(os.path.abspath(__file__)) + spirv_file = os.path.join(curr_dir, "xpu_input_files", fn) + return spirv_file + + +@ray.remote(num_cpus=0, num_gpus=1) +def gpu_func(): + try: + q = dpctl.SyclQueue("level_zero", property="enable_profiling") + except dpctl.SyclQueueCreationError: + pytest.skip("No Level-zero queue is available") + spirv_file = get_spirv_abspath("multi_kernel.spv") + with open(spirv_file, "rb") as fin: + spirv = fin.read() + + prog = dpctl_prog.create_program_from_spirv(q, spirv) + + assert type(prog) is dpctl_prog.SyclProgram + assert type(prog.addressof_ref()) is int + assert prog.has_sycl_kernel("add") + assert prog.has_sycl_kernel("axpy") + + addKernel = prog.get_sycl_kernel("add") + axpyKernel = prog.get_sycl_kernel("axpy") + + assert "add" == addKernel.get_function_name() + assert "axpy" == axpyKernel.get_function_name() + assert 3 == addKernel.get_num_args() + assert 4 == axpyKernel.get_num_args() + assert type(addKernel.addressof_ref()) is int + assert type(axpyKernel.addressof_ref()) is int + + +def test_basic_xpu(shutdown_only): + ray.init(num_cpus=0, num_gpus=1) + + future = gpu_func.remote() + + start = time.time() + ready, not_ready = ray.wait([future], timeout=1) + # assert 0.2 < time.time() - start < 0.3 + assert len(ready) == 0 + assert len(not_ready) == 1 + + +def to_str(*args, **kwargs): + output = io.StringIO() + print(*args, file=output, **kwargs) + contents = output.getvalue() + output.close() + return contents.rstrip() + + +@ray.remote(num_cpus=1, num_gpus=1) +def prod_func(): + input = torch.randn(4, dtype=torch.float32, device=torch.device("cpu")) + c1 = to_str(input) + c2 = to_str(torch.prod(input)) + + input_dpcpp = input.to("xpu") + g1 = to_str(input_dpcpp.cpu()) + g2 = to_str(torch.prod(input_dpcpp).cpu()) + + return (c1, c2, g1, g2) + + +def test_basic_prod(shutdown_only): + ray.init(num_cpus=1, num_gpus=1) + job = prod_func.remote() + + res = ray.get(job) + + print(f"res = {res}") + + assert res[0] == res[2] + assert res[1] == res[3] + + +def test_nms(shutdown_only): + ray.init(num_cpus=1, num_gpus=1) + + @ray.remote(num_cpus=1, num_gpus=1) + def nms_func(): + box = torch.FloatTensor([[2, 3.1, 1, 7], [3, 4, 8, 4.8], [4, 4, 5.6, 7], + [0.1, 0, 8, 1], [4, 4, 5.7, 7.2]]).xpu() + score = torch.FloatTensor([0.5, 0.3, 0.2, 0.4, 0.3]).xpu() + out_ref = torch.LongTensor([0, 3, 1, 4]) + out = torchvision.ops.nms(box, score, 0.3) + return (to_str(out.cpu()), to_str(out_ref)) + + job = nms_func.remote() + res = ray.get(job) + print(f"in test_nms, res = {res}") + assert res[0] == res[1] + + +def test_batched_nms(shutdown_only): + ray.init(num_cpus=1, num_gpus=1) + @ray.remote(num_cpus=1, num_gpus=1) + def batched_nms_func(): + box1 = torch.FloatTensor([[2, 3.1, 1, 7], [3, 4, 8, 4.8], [4, 4, 5.6, 7], + [0.1, 0, 8, 1], [4, 4, 5.7, 7.2]]) + score1 = torch.FloatTensor([0.5, 0.3, 0.2, 0.4, 0.3]) + idx1 = torch.LongTensor([2,1,3,4,0]) + box2 = torch.FloatTensor([[2, 3.1, 1, 5], [3, 4, 8, 4.8], [4, 4, 5.6, 7], + [0.1, 0, 6, 1], [4, 4, 5.7, 7.2]]) + score2 = torch.FloatTensor([0.5, 0.1, 0.2, 0.4, 0.8]) + idx2 = torch.LongTensor([0,1,2,4,3]) + boxes = torch.cat([box1, box2], dim=0).xpu() + scores = torch.cat([score1, score2], dim=0).xpu() + idxs = torch.cat([idx1, idx2], dim=0).xpu() + out = torchvision.ops.batched_nms(boxes, scores, idxs, 0.3) + out_ref = torch.LongTensor([9, 0, 5, 3, 1, 4, 7]) + return (to_str(out.cpu()), to_str(out_ref)) + job = batched_nms_func.remote() + res = ray.get(job) + + print(f"in test_batched_nms, res = {res}") + assert res[0] == res[1] + +""" +def test_linear(shutdown_only): + ray.init(num_cpus=1, num_gpus=1) + @ray.remote(num_cpus=1, num_gpus=0) + def cpu_task_func(): + device = torch.device("cpu:0") + x = torch.tensor([[1, 2, 3, 4, 5], + [2, 3, 4, 5, 6], + [3, 4, 5, 6, 7], + [4, 5, 6, 7, 8], + [5, 6, 7, 8, 9]], + dtype=torch.float, + device=device) + l = torch.nn.Linear(5, 5).to(device, torch.float) + r = l(x) + return to_str(r) + + @ray.remote(num_cpus=0, num_xpus=1) + def xpu_task_func(): + device = torch.device("xpu:0") + x = torch.tensor([[1, 2, 3, 4, 5], + [2, 3, 4, 5, 6], + [3, 4, 5, 6, 7], + [4, 5, 6, 7, 8], + [5, 6, 7, 8, 9]], + dtype=torch.float, + device=device) + l = torch.nn.Linear(5, 5).to(device, torch.float) + r = l(x) + return to_str(r) + + jobs = [cpu_task_func.remote(), xpu_task_func.remote()] + res = ray.get(jobs) + + print(to_str(res)) + assert res[0] == res[1] +""" diff --git a/python/ray/tests/test_runtime_context.py b/python/ray/tests/test_runtime_context.py index 42b7b5fed42e..1a58301a0d02 100644 --- a/python/ray/tests/test_runtime_context.py +++ b/python/ray/tests/test_runtime_context.py @@ -148,6 +148,35 @@ def check(): assert ray.get(result)["CPU"] == 2.0 +def test_get_assigned_resources_gpu(ray_start_1_cpus_6_xpus): + os.environ.set(ray._private.ray_constants.RAY_DEVICE_XPU_AS_GPU, True) + @ray.remote + class Echo: + def check(self): + return ray.get_runtime_context().get_assigned_resources() + + e = Echo.remote() + result = e.check.remote() + print(ray.get(result)) + assert ray.get(result).get("GPU") is None + ray.kill(e) + + e = Echo.options(num_gpus=4).remote() + result = e.check.remote() + assert ray.get(result)["GPU"] == 4.0 + ray.kill(e) + + @ray.remote + def check(): + return ray.get_runtime_context().get_assigned_resources() + + result = check.remote() + assert ray.get(result)["GPU"] == 1.0 + + result = check.options(num_gpus=2).remote() + assert ray.get(result)["GPU"] == 2.0 + + def test_actor_stats_normal_task(ray_start_regular): # Because it works at the core worker level, this API works for tasks. @ray.remote