From 040a39c5899795263e1c1e8530ed6a50807c866a Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Mon, 22 May 2023 17:28:25 +0800 Subject: [PATCH 01/10] enabling intel gpu --- python/ray/_private/resource_spec.py | 65 ++++++++++++++++++---------- python/ray/_private/utils.py | 8 ++++ python/ray/train/tests/test_xpu.py | 59 +++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 24 deletions(-) create mode 100644 python/ray/train/tests/test_xpu.py diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 21163decf1f8..b9646e3ef790 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -4,6 +4,7 @@ import re import subprocess import sys +import dpctl from collections import namedtuple from typing import Optional @@ -157,35 +158,16 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): # ray._private.state.current_node_id(). resources[NODE_ID_PREFIX + node_ip_address] = 1.0 + # get cpu num num_cpus = self.num_cpus if num_cpus is None: num_cpus = ray._private.utils.get_num_cpus() - num_gpus = self.num_gpus - gpu_ids = ray._private.utils.get_cuda_visible_devices() - # Check that the number of GPUs that the raylet wants doesn't - # exceed the amount allowed by CUDA_VISIBLE_DEVICES. - if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids): - raise ValueError( - "Attempting to start raylet with {} GPUs, " - "but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids) - ) + # get gpu num + num_gpus, gpu_types = _get_cuda_info(self.num_gpus) if num_gpus is None: - # Try to automatically detect the number of GPUs. - num_gpus = _autodetect_num_gpus() - # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. - if gpu_ids is not None: - num_gpus = min(num_gpus, len(gpu_ids)) - - try: - if importlib.util.find_spec("GPUtil") is not None: - gpu_types = _get_gpu_types_gputil() - else: - info_string = _get_gpu_info_string() - gpu_types = _constraints_from_gpu_info(info_string) - resources.update(gpu_types) - except Exception: - logger.exception("Could not parse gpu information.") + num_gpus, gpu_types = _get_xpu_info(self.num_gpus) + resources.update(gpu_types) # Choose a default object store size. system_memory = ray._private.utils.get_system_memory() @@ -265,6 +247,41 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): return spec +def _get_cuda_info(num_gpus): + gpu_ids = ray._private.utils.get_cuda_visible_devices() + # Check that the number of GPUs that the raylet wants doesn't + # exceed the amount allowed by CUDA_VISIBLE_DEVICES. + if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids): + raise ValueError( + "Attempting to start raylet with {} GPUs, " + "but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids) + ) + if num_gpus is None: + # Try to automatically detect the number of GPUs. + num_gpus = _autodetect_num_gpus() + # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. + if gpu_ids is not None: + num_gpus = min(num_gpus, len(gpu_ids)) + + gpu_types = "" + try: + if importlib.util.find_spec("GPUtil") is not None: + gpu_types = _get_gpu_types_gputil() + else: + info_string = _get_gpu_info_string() + gpu_types = _constraints_from_gpu_info(info_string) + except Exception: + logger.exception("Could not parse gpu information.") + + return num_gpus, gpu_types + + +def _get_xpu_info(num_gpus): + num_gpus = min(num_gpus, len(dpctl.get_devices(backend="level_zero", device_type="gpu"))) + gpu_types = "gpu" + return num_gpus, gpu_types + + def _autodetect_num_gpus(): """Attempt to detect the number of GPUs on this machine. diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 8f72bd4335e8..8ce29e06427e 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -384,6 +384,14 @@ def set_cuda_visible_devices(gpu_ids): last_set_gpu_ids = gpu_ids +def set_oneapi_device_selector(info): + """Set the ONEAPI_DEVICE_SELECTOR environment variable. + Args: + The xpu device selection for one api. + """ + os.environ["ONEAPI_DEVICE_SELECTOR"] = info + + def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: """Determine a task's resource requirements. diff --git a/python/ray/train/tests/test_xpu.py b/python/ray/train/tests/test_xpu.py new file mode 100644 index 000000000000..6fad61bce6fc --- /dev/null +++ b/python/ray/train/tests/test_xpu.py @@ -0,0 +1,59 @@ +import os +import time + +from unittest.mock import patch +import pytest +import numpy as np +import torch +import torchvision +from torch.nn.parallel import DistributedDataParallel +from torch.utils.data import DataLoader, DistributedSampler + +import ray +import ray.data +from ray.exceptions import RayTaskError +from ray.air import session +from ray import tune + +import ray.train as train +from ray.air.config import ScalingConfig +from ray.train.constants import DEFAULT_NCCL_SOCKET_IFNAME +from ray.train.examples.pytorch.torch_linear_example import LinearDataset +from ray.train.torch.config import TorchConfig, _TorchBackend +from ray.train.torch.torch_trainer import TorchTrainer +from ray.train.trainer import TrainingFailedError +from ray.train._internal.worker_group import WorkerGroup + + +try: + import intel_extension_for_pytorch as ipex +except ImportError: + pass + + +def test_torch_prepare_model_uses_device(ray_start_4_cpus_2_gpus): + """Tests if `prepare_model` uses the train.torch.get_device even if it does not + match with the local rank.""" + # The below test should pass without errors. + + @patch.object( + ray.train.torch.train_loop_utils, + "get_device", + lambda: torch.device(f"cuda:{1 - session.get_local_rank()}"), + ) + def train_func(): + # These assert statements must hold for prepare_model to wrap with DDP. + ######## code changes ####### + model = torch.nn.Linear(1, 1) + model = model.to("xpu") + data = torch.ones(1) + data = data.to("xpu") + model = ipex.optimize(model) + ######## code changes ####### + model = train.torch.prepare_model(model) + model(data) + + trainer = TorchTrainer( + train_func, scaling_config=ScalingConfig(num_workers=2, use_gpu=True) + ) + trainer.fit() From 3b9648720b32ebe6f65e0638a1f714317d544cd2 Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Thu, 25 May 2023 18:45:40 +0800 Subject: [PATCH 02/10] update --- python/ray/_private/ray_constants.py | 3 +++ python/ray/_private/resource_spec.py | 15 ++++++++------- python/ray/_private/utils.py | 17 +++++++++++++++++ python/ray/_private/worker.py | 5 +++++ python/ray/_raylet.pyx | 3 +++ 5 files changed, 36 insertions(+), 7 deletions(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index d3e21046a693..590d3b343e35 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -378,6 +378,9 @@ def env_set_by_user(key): LANGUAGE_WORKER_TYPES = ["python", "java", "cpp"] NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES" +NOSET_XPU_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_XPU_VISIBLE_DEVICES" +XPU_BACKEND = "level_zero" +XPU_DEVICE_TYPE = "gpu" # Default max_retries option in @ray.remote for non-actor # tasks. diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index b9646e3ef790..39a2f4677349 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -163,12 +163,14 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): if num_cpus is None: num_cpus = ray._private.utils.get_num_cpus() - # get gpu num + # get cuda gpu num num_gpus, gpu_types = _get_cuda_info(self.num_gpus) - if num_gpus is None: - num_gpus, gpu_types = _get_xpu_info(self.num_gpus) resources.update(gpu_types) + # add xpu num, xpu includes [cpu, gpu, fpga] + # and device num can be filtered by backend and device_type + num_gpus += _get_num_xpus() + # Choose a default object store size. system_memory = ray._private.utils.get_system_memory() avail_memory = ray._private.utils.estimate_available_memory() @@ -276,10 +278,9 @@ def _get_cuda_info(num_gpus): return num_gpus, gpu_types -def _get_xpu_info(num_gpus): - num_gpus = min(num_gpus, len(dpctl.get_devices(backend="level_zero", device_type="gpu"))) - gpu_types = "gpu" - return num_gpus, gpu_types +def _get_num_xpus(): + num_gpus = len(dpctl.get_devices(backend=XPU_BACKEND, device_type=XPU_DEVICE_TYPE))) + return num_gpus def _autodetect_num_gpus(): diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 8ce29e06427e..1c22eb132dfa 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -384,6 +384,23 @@ def set_cuda_visible_devices(gpu_ids): last_set_gpu_ids = gpu_ids +def set_xpu_visible_devices(xpu_infos): + """Set the ONEAPI_DEVICE_SELECTOR environment variable. + + Args: + num_gpus: Count of GPU that visiable. + """ + + if os.environ.get(ray_constants.NOSET_XPU_VISIBLE_DEVICES_ENV_VAR): + return + + # TODO check correctly way to construct the ONEAPI_DEVICE_SELECTOR + selection = "" + for info in xpu_infos: + selection.append(f"{info.backend}:{info.device_type};}") + os.environ["ONEAPI_DEVICE_SELECTOR"] = selection + + def set_oneapi_device_selector(info): """Set the ONEAPI_DEVICE_SELECTOR environment variable. Args: diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 161dafa34fba..64872d0d6dfc 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -949,6 +949,11 @@ def get_gpu_ids(): return assigned_ids +@PublicAPI +@client_mode_hook(auto_init=True) +def get_xpu_infos(): + pass + @Deprecated( message="Use ray.get_runtime_context().get_assigned_resources() instead.", diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index c34179589cec..c4ecddbf0423 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1047,6 +1047,9 @@ cdef execute_task_with_cancellation_handler( # Automatically restrict the GPUs available to this task. ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids()) + # Automatically restrict the XPU device avaialable to this task + ray._private.utils.set_xpu_visible_devices(ray.get_xpu_infos()) + # Automatically configure OMP_NUM_THREADS to the assigned CPU number. # It will be unset after the task execution if it was overwridden here. # No-op if already set. From 6a7d29b02fb2e7d6c56f407fb2c95369f2067fce Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Thu, 25 May 2023 19:00:36 +0800 Subject: [PATCH 03/10] update --- python/ray/_private/utils.py | 8 -- python/ray/tests/test_basic_xpu.py | 70 ++++++++++++++++++ .../tests/xpu_input_files/multi_kernel.spv | Bin 0 -> 1412 bytes 3 files changed, 70 insertions(+), 8 deletions(-) create mode 100644 python/ray/tests/test_basic_xpu.py create mode 100644 python/ray/tests/xpu_input_files/multi_kernel.spv diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 1c22eb132dfa..5663f2d2a898 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -401,14 +401,6 @@ def set_xpu_visible_devices(xpu_infos): os.environ["ONEAPI_DEVICE_SELECTOR"] = selection -def set_oneapi_device_selector(info): - """Set the ONEAPI_DEVICE_SELECTOR environment variable. - Args: - The xpu device selection for one api. - """ - os.environ["ONEAPI_DEVICE_SELECTOR"] = info - - def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: """Determine a task's resource requirements. diff --git a/python/ray/tests/test_basic_xpu.py b/python/ray/tests/test_basic_xpu.py new file mode 100644 index 000000000000..3ee7ab8caa74 --- /dev/null +++ b/python/ray/tests/test_basic_xpu.py @@ -0,0 +1,70 @@ +# coding: utf-8 +import logging +import os +import subprocess +import sys +import tempfile +import threading +import time +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +from ray._private.ray_constants import KV_NAMESPACE_FUNCTION_TABLE +from ray._private.test_utils import client_test_enabled +from ray.cluster_utils import Cluster, cluster_not_supported +from ray.exceptions import GetTimeoutError, RayTaskError +from ray.tests.client_test_utils import create_remote_signal_actor + +if client_test_enabled(): + from ray.util.client import ray +else: + import ray + +logger = logging.getLogger(__name__) + + +def get_spirv_abspath(fn): + curr_dir = os.path.dirname(os.path.abspath(__file__)) + spirv_file = os.path.join(curr_dir, "xpu_input_files", fn) + return spirv_file + + +@ray.remote(num_cpus=0, num_gpus=1) +def gpu_func(): + try: + q = dpctl.SyclQueue("level_zero", property="enable_profiling") + except dpctl.SyclQueueCreationError: + pytest.skip("No Level-zero queue is available") + spirv_file = get_spirv_abspath("multi_kernel.spv") + with open(spirv_file, "rb") as fin: + spirv = fin.read() + + prog = dpctl_prog.create_program_from_spirv(q, spirv) + + assert type(prog) is dpctl_prog.SyclProgram + assert type(prog.addressof_ref()) is int + assert prog.has_sycl_kernel("add") + assert prog.has_sycl_kernel("axpy") + + addKernel = prog.get_sycl_kernel("add") + axpyKernel = prog.get_sycl_kernel("axpy") + + assert "add" == addKernel.get_function_name() + assert "axpy" == axpyKernel.get_function_name() + assert 3 == addKernel.get_num_args() + assert 4 == axpyKernel.get_num_args() + assert type(addKernel.addressof_ref()) is int + assert type(axpyKernel.addressof_ref()) is int + +def test_init_gpu(shutdown_only): + ray.init(num_cpus=0, num_gpus=1) + + future = gpu_func.remote() + + start = time.time() + ready, not_ready = ray.wait([future], timeout=1) + assert 0.2 < time.time() - start < 0.3 + assert len(ready) == 0 + assert len(not_ready) == 1 diff --git a/python/ray/tests/xpu_input_files/multi_kernel.spv b/python/ray/tests/xpu_input_files/multi_kernel.spv new file mode 100644 index 0000000000000000000000000000000000000000..1fa772d8dfd583970caa6666704fe5d7434f1dda GIT binary patch literal 1412 zcma)*+iDY06oywjX;ZB+)>`Y?Ne>=M3D^r2L@5e}h#)?}Fl~Yar;{+9miEpIAKDwi z|JySw2FZmx?5y*@4tq_sxZN~cu&3hk-*@53S=W=UFnOUld1u&cQ4C@=T+ z|NdFiTJ~(7ukikLR@go{qVO|2Y(>gPHk*F+~}F&bP{^7>SM5prxu zu4dG*+f&;?u-(O`OD*?AJ^y|2RsP5xh_C8x*hBHNwLHderj}UpY%N<;xvFJH^2u2i z^6iTEx}Hq^4y>mhpG!u^Ec<$KWc0|)jGB6?8JV7XdN+9d{%7{=)+bvD{@^itX3jjA z1O0C(ANzKw%R$y6Ta2KyDLE3w2O3Lmh=Ps0E(`mJenph}i0?>yMHCx)za3dO=je%f zQ)7bx{!5_|&;E13d!?*k(U0@=*qo##!R=WXH1%^oom_S$FiNc_B+uhn46iboH=WAcsA517me89&GKIsE~VDR=Pz literal 0 HcmV?d00001 From d621071813d1839dfe38182433f1550bebc7e02b Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Thu, 25 May 2023 22:57:03 +0800 Subject: [PATCH 04/10] update --- python/ray/_private/utils.py | 6 +++--- python/ray/_private/worker.py | 4 ++-- python/ray/_raylet.pyx | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 5663f2d2a898..9f7e95ca0c49 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -384,11 +384,11 @@ def set_cuda_visible_devices(gpu_ids): last_set_gpu_ids = gpu_ids -def set_xpu_visible_devices(xpu_infos): +def set_xpu_visible_devices(sycl_devices : List[dpctl.SyclDevice]): """Set the ONEAPI_DEVICE_SELECTOR environment variable. Args: - num_gpus: Count of GPU that visiable. + sycl_devices: List of sycl device. """ if os.environ.get(ray_constants.NOSET_XPU_VISIBLE_DEVICES_ENV_VAR): @@ -396,7 +396,7 @@ def set_xpu_visible_devices(xpu_infos): # TODO check correctly way to construct the ONEAPI_DEVICE_SELECTOR selection = "" - for info in xpu_infos: + for info in sycl_devices: selection.append(f"{info.backend}:{info.device_type};}") os.environ["ONEAPI_DEVICE_SELECTOR"] = selection diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 64872d0d6dfc..738116e57598 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -951,8 +951,8 @@ def get_gpu_ids(): @PublicAPI @client_mode_hook(auto_init=True) -def get_xpu_infos(): - pass +def get_xpu_devices(): + return dpctl.get_devices(backend=XPU_BACKEDN, device_type=XPU_DEVICE_TYPE) @Deprecated( diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index c4ecddbf0423..ec5143c7264d 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1048,7 +1048,7 @@ cdef execute_task_with_cancellation_handler( ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids()) # Automatically restrict the XPU device avaialable to this task - ray._private.utils.set_xpu_visible_devices(ray.get_xpu_infos()) + ray._private.utils.set_xpu_visible_devices(ray.get_xpu_devices()) # Automatically configure OMP_NUM_THREADS to the assigned CPU number. # It will be unset after the task execution if it was overwridden here. From f2466287598fbe5d41e172207cccc10afb61beab Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Tue, 30 May 2023 16:27:10 +0800 Subject: [PATCH 05/10] update ut --- python/ray/tests/test_basic_xpu.py | 90 +++++++++++++++++++++++++++++- 1 file changed, 88 insertions(+), 2 deletions(-) diff --git a/python/ray/tests/test_basic_xpu.py b/python/ray/tests/test_basic_xpu.py index 3ee7ab8caa74..ef7eac737612 100644 --- a/python/ray/tests/test_basic_xpu.py +++ b/python/ray/tests/test_basic_xpu.py @@ -1,4 +1,5 @@ # coding: utf-8 +import io import logging import os import subprocess @@ -11,6 +12,13 @@ import numpy as np import pytest +import dpctl +import dpctl.program as dpctl_prog + +import torch +import torchvision +import intel_extension_for_pytorch + from ray._private.ray_constants import KV_NAMESPACE_FUNCTION_TABLE from ray._private.test_utils import client_test_enabled from ray.cluster_utils import Cluster, cluster_not_supported @@ -58,13 +66,91 @@ def gpu_func(): assert type(addKernel.addressof_ref()) is int assert type(axpyKernel.addressof_ref()) is int -def test_init_gpu(shutdown_only): + +def test_basic_xpu(shutdown_only): ray.init(num_cpus=0, num_gpus=1) future = gpu_func.remote() start = time.time() ready, not_ready = ray.wait([future], timeout=1) - assert 0.2 < time.time() - start < 0.3 + # assert 0.2 < time.time() - start < 0.3 assert len(ready) == 0 assert len(not_ready) == 1 + + +def to_str(*args, **kwargs): + output = io.StringIO() + print(*args, file=output, **kwargs) + contents = output.getvalue() + output.close() + return contents.rstrip() + + +@ray.remote(num_cpus=1, num_gpus=1) +def prod_func(): + input = torch.randn(4, dtype=torch.float32, device=torch.device("cpu")) + c1 = to_str(input) + c2 = to_str(torch.prod(input)) + + input_dpcpp = input.to("xpu") + g1 = to_str(input_dpcpp.cpu()) + g2 = to_str(torch.prod(input_dpcpp).cpu()) + + return (c1, c2, g1, g2) + + +def test_basic_prod(shutdown_only): + ray.init(num_cpus=1, num_gpus=1) + job = prod_func.remote() + + res = ray.get(job) + + print(f"res = {res}") + + assert res[0] == res[2] + assert res[1] == res[3] + + +def test_nms(shutdown_only): + ray.init(num_cpus=1, num_gpus=1) + + @ray.remote(num_cpus=1, num_gpus=1) + def nms_func(): + box = torch.FloatTensor([[2, 3.1, 1, 7], [3, 4, 8, 4.8], [4, 4, 5.6, 7], + [0.1, 0, 8, 1], [4, 4, 5.7, 7.2]]).xpu() + score = torch.FloatTensor([0.5, 0.3, 0.2, 0.4, 0.3]).xpu() + out_ref = torch.LongTensor([0, 3, 1, 4]) + out = torchvision.ops.nms(box, score, 0.3) + return (to_str(out.cpu(), to_str(out_ref))) + + job = nms_func.remote() + res = ray.get(job) + print(f"in test_nms, res = {res}") + assert res[0] == res[1] + + +def test_batched_nms(shutdown_only): + ray.init(num_cpus=1, num_gpus=1) + @ray.remote(num_cpus=1, num_gpus=1) + def batched_nms_func(): + box1 = torch.FloatTensor([[2, 3.1, 1, 7], [3, 4, 8, 4.8], [4, 4, 5.6, 7], + [0.1, 0, 8, 1], [4, 4, 5.7, 7.2]]) + score1 = torch.FloatTensor([0.5, 0.3, 0.2, 0.4, 0.3]) + idx1 = torch.LongTensor([2,1,3,4,0]) + box2 = torch.FloatTensor([[2, 3.1, 1, 5], [3, 4, 8, 4.8], [4, 4, 5.6, 7], + [0.1, 0, 6, 1], [4, 4, 5.7, 7.2]]) + score2 = torch.FloatTensor([0.5, 0.1, 0.2, 0.4, 0.8]) + idx2 = torch.LongTensor([0,1,2,4,3]) + boxes = torch.cat([box1, box2], dim=0).xpu() + scores = torch.cat([score1, score2], dim=0).xpu() + idxs = torch.cat([idx1, idx2], dim=0).xpu() + out = torchvision.ops.batched_nms(boxes, scores, idxs, 0.3) + out_ref = torch.LongTensor([9, 0, 5, 3, 1, 4, 7]) + return (to_str(out.cpu()), to_str(out_ref)) + job = batched_nms_func.remote() + res = ray.get(job) + + print(f"in test_batched_nms, res = {res}") + assert res[0] == res[1] + From 35cb47abb3a095b7a09bbbf56cccd425b1e6bbe8 Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Tue, 6 Jun 2023 16:27:32 +0800 Subject: [PATCH 06/10] update ut --- python/ray/tests/test_basic_xpu.py | 37 +++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/python/ray/tests/test_basic_xpu.py b/python/ray/tests/test_basic_xpu.py index ef7eac737612..ab8bac406b4c 100644 --- a/python/ray/tests/test_basic_xpu.py +++ b/python/ray/tests/test_basic_xpu.py @@ -122,7 +122,7 @@ def nms_func(): score = torch.FloatTensor([0.5, 0.3, 0.2, 0.4, 0.3]).xpu() out_ref = torch.LongTensor([0, 3, 1, 4]) out = torchvision.ops.nms(box, score, 0.3) - return (to_str(out.cpu(), to_str(out_ref))) + return (to_str(out.cpu()), to_str(out_ref)) job = nms_func.remote() res = ray.get(job) @@ -154,3 +154,38 @@ def batched_nms_func(): print(f"in test_batched_nms, res = {res}") assert res[0] == res[1] +""" +def test_linear(shutdown_only): + ray.init(num_cpus=0, num_gpus=1) + @ray.remote(num_cpus=0, num_gpus=1) + def cpu_task_func(): + device = torch.device("cpu:0") + x = torch.tensor([[1, 2, 3, 4, 5], + [2, 3, 4, 5, 6], + [3, 4, 5, 6, 7], + [4, 5, 6, 7, 8], + [5, 6, 7, 8, 9]], + device=device) + l = torch.nn.Linear(5, 5).to(device) + r = l(x) + return to_str(r) + + @ray.remote(num_cpus=1, num_gpus=0) + def xpu_task_func(): + device = torch.device("xpu:0") + x = torch.tensor([[1, 2, 3, 4, 5], + [2, 3, 4, 5, 6], + [3, 4, 5, 6, 7], + [4, 5, 6, 7, 8], + [5, 6, 7, 8, 9]], + device=device) + l = torch.nn.Linear(5, 5).to(device) + r = l(x) + return to_str() + + jobs = [cpu_task_func.remote(), xpu_task_func.remote()] + res = ray.get(jobs) + + print(to_str(res)) + assert res[0] == res[1] +""" From 363d7c82f5c8993ea0cab23f7853308710da064f Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Tue, 6 Jun 2023 16:48:38 +0800 Subject: [PATCH 07/10] update --- python/ray/tests/test_basic_xpu.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/python/ray/tests/test_basic_xpu.py b/python/ray/tests/test_basic_xpu.py index ab8bac406b4c..5f0abd173bd9 100644 --- a/python/ray/tests/test_basic_xpu.py +++ b/python/ray/tests/test_basic_xpu.py @@ -156,8 +156,8 @@ def batched_nms_func(): """ def test_linear(shutdown_only): - ray.init(num_cpus=0, num_gpus=1) - @ray.remote(num_cpus=0, num_gpus=1) + ray.init(num_cpus=1, num_gpus=1) + @ray.remote(num_cpus=1, num_gpus=0) def cpu_task_func(): device = torch.device("cpu:0") x = torch.tensor([[1, 2, 3, 4, 5], @@ -165,12 +165,13 @@ def cpu_task_func(): [3, 4, 5, 6, 7], [4, 5, 6, 7, 8], [5, 6, 7, 8, 9]], + dtype=torch.float, device=device) - l = torch.nn.Linear(5, 5).to(device) + l = torch.nn.Linear(5, 5).to(device, torch.float) r = l(x) return to_str(r) - @ray.remote(num_cpus=1, num_gpus=0) + @ray.remote(num_cpus=0, num_gpus=1) def xpu_task_func(): device = torch.device("xpu:0") x = torch.tensor([[1, 2, 3, 4, 5], @@ -178,10 +179,11 @@ def xpu_task_func(): [3, 4, 5, 6, 7], [4, 5, 6, 7, 8], [5, 6, 7, 8, 9]], + dtype=torch.float, device=device) - l = torch.nn.Linear(5, 5).to(device) + l = torch.nn.Linear(5, 5).to(device, torch.float) r = l(x) - return to_str() + return to_str(r) jobs = [cpu_task_func.remote(), xpu_task_func.remote()] res = ray.get(jobs) From c1e6db879d5aa9fa852579a623e8ec093c7e882f Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Thu, 8 Jun 2023 22:20:04 +0800 Subject: [PATCH 08/10] upgrade --- python/ray/_private/ray_constants.py | 7 +++- python/ray/_private/resource_spec.py | 41 +++++++++++++------ python/ray/_private/utils.py | 30 ++++++++++---- python/ray/_private/worker.py | 51 ++++++++++++++++++++++-- python/ray/_raylet.pyx | 2 +- python/ray/tests/conftest.py | 7 ++++ python/ray/tests/test_basic_xpu.py | 2 +- python/ray/tests/test_runtime_context.py | 29 ++++++++++++++ 8 files changed, 142 insertions(+), 27 deletions(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 590d3b343e35..d4324db92c13 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -379,8 +379,11 @@ def env_set_by_user(key): NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES" NOSET_XPU_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_XPU_VISIBLE_DEVICES" -XPU_BACKEND = "level_zero" -XPU_DEVICE_TYPE = "gpu" +RAY_DEVICE_XPU_BACKEND_TYPE = "level_zero" +RAY_DEVICE_XPU_DEVICE_TYPE = "gpu" +RAY_DEVICE_XPU_AS_GPU = True + +RAY_DEVICE_TYPES = {"CPU", "CUDA", "XPU"} # Default max_retries option in @ray.remote for non-actor # tasks. diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 39a2f4677349..9a1b59a586f5 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -4,13 +4,17 @@ import re import subprocess import sys -import dpctl from collections import namedtuple from typing import Optional import ray import ray._private.ray_constants as ray_constants +try: + import dpctl +except ImportError + pass + try: import GPUtil except ImportError: @@ -159,17 +163,20 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): resources[NODE_ID_PREFIX + node_ip_address] = 1.0 # get cpu num - num_cpus = self.num_cpus - if num_cpus is None: - num_cpus = ray._private.utils.get_num_cpus() + if "CPU" in RAY_DEVICE_TYPES: + num_cpus = self.num_cpus + if num_cpus is None: + num_cpus = ray._private.utils.get_num_cpus() - # get cuda gpu num - num_gpus, gpu_types = _get_cuda_info(self.num_gpus) - resources.update(gpu_types) + if "CUDA" in RAY_DEVICE_TYPES: # get cuda gpu num + num_gpus, gpu_types = _get_cuda_info(self.num_gpus) + resources.update(gpu_types) - # add xpu num, xpu includes [cpu, gpu, fpga] - # and device num can be filtered by backend and device_type - num_gpus += _get_num_xpus() + if "XPU" in RAY_DEVICE_TYPES: + if os.environ.get(RAY_DEVICE_XPU_AS_GPU, "True"): + num_xpus, xpu_types = _get_(self.num_gpus) + num_gpus += num_xpus + resource.udpate(xpu_types) # Choose a default object store size. system_memory = ray._private.utils.get_system_memory() @@ -278,9 +285,17 @@ def _get_cuda_info(num_gpus): return num_gpus, gpu_types -def _get_num_xpus(): - num_gpus = len(dpctl.get_devices(backend=XPU_BACKEND, device_type=XPU_DEVICE_TYPE))) - return num_gpus +def _get_xpu_info(num_xpus): + """Attempt to process the number of XPUs as GPUs + """ + xpu_ids = ray._private.utils.get_xpu_visible_devices() + if num_xpus is None: + num_xpus = len(dpctl.get_devices(backend=RAY_DEVICE_XPU_BACKEND_TYPE, + device_type=RAY_DEVICE_XPU_DEVICE_TYPE)) + if xpu_ids is not None: + num_xpus = min(num_xpus, len(xpu_ids)) + xpu_types = {f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" "xpu": 1} + return num_xpus, xpu_types def _autodetect_num_gpus(): diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 9f7e95ca0c49..0e6f51663e99 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -326,6 +326,24 @@ def get_cuda_visible_devices(): last_set_gpu_ids = None +def get_xpu_visible_devices(): + """Get the devices IDs in the XPU_VISIBLE_DEVICES environment variable. + + Returns: + devices (List[str]): If XPU_VISIBLE_DEVICES is set, return a + list of strings representing the IDs of the visible XPUs. + If it is not set or is set, returns empty list. + """ + xpu_ids_str = os.environ.get("XPU_VISIBLE_DEVICES", None) + if xpu_ids_str is None: + return None + + if xpu_ids_str == "": + return [] + + return list(xpu_ids_str.split(",")) + + def set_omp_num_threads_if_unset() -> bool: """Set the OMP_NUM_THREADS to default to num cpus assigned to the worker @@ -384,21 +402,19 @@ def set_cuda_visible_devices(gpu_ids): last_set_gpu_ids = gpu_ids -def set_xpu_visible_devices(sycl_devices : List[dpctl.SyclDevice]): +def set_xpu_visible_devices(xpu_ids): """Set the ONEAPI_DEVICE_SELECTOR environment variable. Args: - sycl_devices: List of sycl device. + xpu_ids (List[str]): List of strings representing GPU IDs """ if os.environ.get(ray_constants.NOSET_XPU_VISIBLE_DEVICES_ENV_VAR): return - # TODO check correctly way to construct the ONEAPI_DEVICE_SELECTOR - selection = "" - for info in sycl_devices: - selection.append(f"{info.backend}:{info.device_type};}") - os.environ["ONEAPI_DEVICE_SELECTOR"] = selection + ids_str = ",".join([str(i) for i in xpu_ids]) + os.environ["XPU_VISIBLE_DEVICES"] = ids_str + os.environ["ONEAPI_DEVICE_SELECTOR"] = RAY_DEVICE_XPU_BACKEND_TYPE + ":" + ids_str def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 738116e57598..5ba428cf471b 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -434,6 +434,8 @@ def __init__(self): # When the worker is constructed. Record the original value of the # CUDA_VISIBLE_DEVICES environment variable. self.original_gpu_ids = ray._private.utils.get_cuda_visible_devices() + # record the original value of the XPU_VISIBLE_DEVICES environment variable. + self.original_xpu_ids = ray._private.utils.get_xpu_visible_devices() # A dictionary that maps from driver id to SerializationContext # TODO: clean up the SerializationContext once the job finished. self.serialization_context_map = {} @@ -949,11 +951,54 @@ def get_gpu_ids(): return assigned_ids + @PublicAPI @client_mode_hook(auto_init=True) -def get_xpu_devices(): - return dpctl.get_devices(backend=XPU_BACKEDN, device_type=XPU_DEVICE_TYPE) - +def get_xpu_ids(): + """ Get the IDs of the XPUs that are available to the worker. + + If the XPU_VISIBLE_DEVICES environment variable was set when the worker + started up, + + Returns: + A list of XPU IDs + """ + worker = global_worker + worker.check_connected() + + if worker.mode != WORKER_MODE: + if log_once("worker_get_gpu_ids_empty_from_driver"): + logger.warning( + "`ray.get_xpu_ids()` will always return the empty list when " + "called from the driver. This is because Ray does not manage " + "XPU allocations to the driver process." + ) + # here we use `dpctl` to detect XPU device + + # xpu_devices get all device under environment varaible ONEAPI_DEVICE_SELECTOR + xpu_devices = dpctl.get_devices(backend=RAY_DEVICE_XPU_BACKEND_TYPE, + device_type=RAY_DEVICE_XPU_DEVICE_TYPE) + xpu_ids = [] + if len(xpu_devices) == 0: + return xpu_ids + + if global_worker.original_xpu_ids is None: + return range(len(xpu_devices)) + + # return ids should take care of 3 case: + # 1. XPU_VISIBLE_DEVICES define ids count equal count of xpu_devices + if len(xpu_devices) == len(global_worker.original_xpu_ids): + return global_worker.original_xpu_ids + + # 2. XPU_VISIBLE_DEVICES define ids count less than count of xpu_devices + # [2,3], level_zero:2,3,4 + if len(xpu_devices) < len(global_worker.or + + # 3. XPU_VISIBLE_DEVICES define ids count more than count of xpu_devices + + if len(xpu_devices) != global_worker.original_xpu_ids: + + return xpu_ids @Deprecated( message="Use ray.get_runtime_context().get_assigned_resources() instead.", diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index ec5143c7264d..76e876296547 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1048,7 +1048,7 @@ cdef execute_task_with_cancellation_handler( ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids()) # Automatically restrict the XPU device avaialable to this task - ray._private.utils.set_xpu_visible_devices(ray.get_xpu_devices()) + ray._private.utils.set_xpu_visible_devices(ray.get_xpu_ids()) # Automatically configure OMP_NUM_THREADS to the assigned CPU number. # It will be unset after the task execution if it was overwridden here. diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index cdc049ca860f..6534e4b2293c 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -354,6 +354,13 @@ def ray_start_10_cpus(request, maybe_external_redis): yield res +@pytest.fixture +def ray_start_1_cpus_6_xpus(request, maybe_external_redis): + param = getattr(request, "param", {}) + with _ray_start(num_cpus=1, num_gpus=6, **param) as res: + yield res + + @contextmanager def _ray_start_cluster(**kwargs): cluster_not_supported_ = kwargs.pop("skip_cluster", cluster_not_supported) diff --git a/python/ray/tests/test_basic_xpu.py b/python/ray/tests/test_basic_xpu.py index 5f0abd173bd9..58ec4c88361f 100644 --- a/python/ray/tests/test_basic_xpu.py +++ b/python/ray/tests/test_basic_xpu.py @@ -171,7 +171,7 @@ def cpu_task_func(): r = l(x) return to_str(r) - @ray.remote(num_cpus=0, num_gpus=1) + @ray.remote(num_cpus=0, num_xpus=1) def xpu_task_func(): device = torch.device("xpu:0") x = torch.tensor([[1, 2, 3, 4, 5], diff --git a/python/ray/tests/test_runtime_context.py b/python/ray/tests/test_runtime_context.py index 42b7b5fed42e..1a58301a0d02 100644 --- a/python/ray/tests/test_runtime_context.py +++ b/python/ray/tests/test_runtime_context.py @@ -148,6 +148,35 @@ def check(): assert ray.get(result)["CPU"] == 2.0 +def test_get_assigned_resources_gpu(ray_start_1_cpus_6_xpus): + os.environ.set(ray._private.ray_constants.RAY_DEVICE_XPU_AS_GPU, True) + @ray.remote + class Echo: + def check(self): + return ray.get_runtime_context().get_assigned_resources() + + e = Echo.remote() + result = e.check.remote() + print(ray.get(result)) + assert ray.get(result).get("GPU") is None + ray.kill(e) + + e = Echo.options(num_gpus=4).remote() + result = e.check.remote() + assert ray.get(result)["GPU"] == 4.0 + ray.kill(e) + + @ray.remote + def check(): + return ray.get_runtime_context().get_assigned_resources() + + result = check.remote() + assert ray.get(result)["GPU"] == 1.0 + + result = check.options(num_gpus=2).remote() + assert ray.get(result)["GPU"] == 2.0 + + def test_actor_stats_normal_task(ray_start_regular): # Because it works at the core worker level, this API works for tasks. @ray.remote From b9a7a4299c2d0f0d15e7dd4fb78ae71ce6e6d8c3 Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Fri, 9 Jun 2023 15:35:02 +0800 Subject: [PATCH 09/10] upgrade --- python/ray/_private/ray_constants.py | 1 + python/ray/_private/resource_spec.py | 18 ++++++++++--- python/ray/_private/worker.py | 39 ++++++++++++---------------- 3 files changed, 33 insertions(+), 25 deletions(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index d4324db92c13..9dffe6dafd16 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -379,6 +379,7 @@ def env_set_by_user(key): NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES" NOSET_XPU_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_XPU_VISIBLE_DEVICES" +RAY_DEVICE_XPU_SELECTOR_ENV_VAR = "ONEAPI_DEVICE_SELECTOR" RAY_DEVICE_XPU_BACKEND_TYPE = "level_zero" RAY_DEVICE_XPU_DEVICE_TYPE = "gpu" RAY_DEVICE_XPU_AS_GPU = True diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 9a1b59a586f5..577b0e76ef3b 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -168,13 +168,17 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): if num_cpus is None: num_cpus = ray._private.utils.get_num_cpus() - if "CUDA" in RAY_DEVICE_TYPES: # get cuda gpu num + # get cuda gpu num + if "CUDA" in RAY_DEVICE_TYPES: num_gpus, gpu_types = _get_cuda_info(self.num_gpus) resources.update(gpu_types) + # get xpu num if "XPU" in RAY_DEVICE_TYPES: - if os.environ.get(RAY_DEVICE_XPU_AS_GPU, "True"): - num_xpus, xpu_types = _get_(self.num_gpus) + if os.environ.get(ray_constants.RAY_DEVICE_XPU_AS_GPU, "True"): + # here we take xpu as gpu, so no need to develop core's scheduling policy + # If we don't want to take xpu as gpu, ray core need to develop new scheduling policy + num_xpus, xpu_types = _get_xpu_info(self.num_gpus) num_gpus += num_xpus resource.udpate(xpu_types) @@ -287,8 +291,16 @@ def _get_cuda_info(num_gpus): def _get_xpu_info(num_xpus): """Attempt to process the number of XPUs as GPUs + + Returns: + The number of XPUs """ xpu_ids = ray._private.utils.get_xpu_visible_devices() + if num_xpus is not None and xpu_ids is not None and num_xpus > len(xpu_ids): + raise ValueError( + "Attempting to start raylet with {} XPUs, " + "but XPU_VISIBLE_DEVICES contains {}.".format(num_xpus, xpu_ids) + ) if num_xpus is None: num_xpus = len(dpctl.get_devices(backend=RAY_DEVICE_XPU_BACKEND_TYPE, device_type=RAY_DEVICE_XPU_DEVICE_TYPE)) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 5ba428cf471b..d57722d35e2f 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -973,30 +973,25 @@ def get_xpu_ids(): "called from the driver. This is because Ray does not manage " "XPU allocations to the driver process." ) - # here we use `dpctl` to detect XPU device + # Here we use `dpctl` to detect XPU device: + # Enumrate all device by API dpctl.get_devices + # Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset + # Or dpctl.get_devices will only return filtered device set by ONEAPI_DEVICE_SELECTOR + # Another method to enumrate XPU device is to use C++ API, maybe can upgrade laster + + xpu_devices = dpctl.get_devices(backend=ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE, + device_type=ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE) + xpu_ava_ids = set() + xpu_dev_prefix = f"{ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE}:{ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE}" + for xpu_dev in xpu_devices: + xpu_id = int(xpu_dev.filter_string.split(xpu_dev_prefix)[1]) + xpu_ava_ids.add(xpu_id) - # xpu_devices get all device under environment varaible ONEAPI_DEVICE_SELECTOR - xpu_devices = dpctl.get_devices(backend=RAY_DEVICE_XPU_BACKEND_TYPE, - device_type=RAY_DEVICE_XPU_DEVICE_TYPE) xpu_ids = [] - if len(xpu_devices) == 0: - return xpu_ids - - if global_worker.original_xpu_ids is None: - return range(len(xpu_devices)) - - # return ids should take care of 3 case: - # 1. XPU_VISIBLE_DEVICES define ids count equal count of xpu_devices - if len(xpu_devices) == len(global_worker.original_xpu_ids): - return global_worker.original_xpu_ids - - # 2. XPU_VISIBLE_DEVICES define ids count less than count of xpu_devices - # [2,3], level_zero:2,3,4 - if len(xpu_devices) < len(global_worker.or - - # 3. XPU_VISIBLE_DEVICES define ids count more than count of xpu_devices - - if len(xpu_devices) != global_worker.original_xpu_ids: + if global_worker.original_xpu_ids is not None: + xpu_ids = [ + global_worker.original_xpu_ids[xpu_id] for xpu_id in xpu_ava_ids + ] return xpu_ids From ab3492f7ef9fdebbfb0eb4b644c3a165aa2f9d69 Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Fri, 9 Jun 2023 15:47:44 +0800 Subject: [PATCH 10/10] update --- .../tests/xpu_input_files/multi_kernel.spv | Bin 1412 -> 0 bytes python/ray/train/tests/test_xpu.py | 59 ------------------ 2 files changed, 59 deletions(-) delete mode 100644 python/ray/tests/xpu_input_files/multi_kernel.spv delete mode 100644 python/ray/train/tests/test_xpu.py diff --git a/python/ray/tests/xpu_input_files/multi_kernel.spv b/python/ray/tests/xpu_input_files/multi_kernel.spv deleted file mode 100644 index 1fa772d8dfd583970caa6666704fe5d7434f1dda..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1412 zcma)*+iDY06oywjX;ZB+)>`Y?Ne>=M3D^r2L@5e}h#)?}Fl~Yar;{+9miEpIAKDwi z|JySw2FZmx?5y*@4tq_sxZN~cu&3hk-*@53S=W=UFnOUld1u&cQ4C@=T+ z|NdFiTJ~(7ukikLR@go{qVO|2Y(>gPHk*F+~}F&bP{^7>SM5prxu zu4dG*+f&;?u-(O`OD*?AJ^y|2RsP5xh_C8x*hBHNwLHderj}UpY%N<;xvFJH^2u2i z^6iTEx}Hq^4y>mhpG!u^Ec<$KWc0|)jGB6?8JV7XdN+9d{%7{=)+bvD{@^itX3jjA z1O0C(ANzKw%R$y6Ta2KyDLE3w2O3Lmh=Ps0E(`mJenph}i0?>yMHCx)za3dO=je%f zQ)7bx{!5_|&;E13d!?*k(U0@=*qo##!R=WXH1%^oom_S$FiNc_B+uhn46iboH=WAcsA517me89&GKIsE~VDR=Pz diff --git a/python/ray/train/tests/test_xpu.py b/python/ray/train/tests/test_xpu.py deleted file mode 100644 index 6fad61bce6fc..000000000000 --- a/python/ray/train/tests/test_xpu.py +++ /dev/null @@ -1,59 +0,0 @@ -import os -import time - -from unittest.mock import patch -import pytest -import numpy as np -import torch -import torchvision -from torch.nn.parallel import DistributedDataParallel -from torch.utils.data import DataLoader, DistributedSampler - -import ray -import ray.data -from ray.exceptions import RayTaskError -from ray.air import session -from ray import tune - -import ray.train as train -from ray.air.config import ScalingConfig -from ray.train.constants import DEFAULT_NCCL_SOCKET_IFNAME -from ray.train.examples.pytorch.torch_linear_example import LinearDataset -from ray.train.torch.config import TorchConfig, _TorchBackend -from ray.train.torch.torch_trainer import TorchTrainer -from ray.train.trainer import TrainingFailedError -from ray.train._internal.worker_group import WorkerGroup - - -try: - import intel_extension_for_pytorch as ipex -except ImportError: - pass - - -def test_torch_prepare_model_uses_device(ray_start_4_cpus_2_gpus): - """Tests if `prepare_model` uses the train.torch.get_device even if it does not - match with the local rank.""" - # The below test should pass without errors. - - @patch.object( - ray.train.torch.train_loop_utils, - "get_device", - lambda: torch.device(f"cuda:{1 - session.get_local_rank()}"), - ) - def train_func(): - # These assert statements must hold for prepare_model to wrap with DDP. - ######## code changes ####### - model = torch.nn.Linear(1, 1) - model = model.to("xpu") - data = torch.ones(1) - data = data.to("xpu") - model = ipex.optimize(model) - ######## code changes ####### - model = train.torch.prepare_model(model) - model(data) - - trainer = TorchTrainer( - train_func, scaling_config=ScalingConfig(num_workers=2, use_gpu=True) - ) - trainer.fit()