From 01606ff36cb8cac6130717a84cf7a2c23811b5e9 Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Mon, 26 Jan 2026 15:27:57 +0800 Subject: [PATCH 1/5] add ds zero copy in CPU Tensor Signed-off-by: Evelynn-V --- .github/workflows/python-package.yml | 2 +- tests/test_yuanrong_storage_manager.py | 8 ++++---- transfer_queue/storage/clients/yuanrong_client.py | 14 ++++++++++++++ 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 2917c78..cadce43 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -31,7 +31,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install flake8 pytest build pytest_asyncio + python -m pip install flake8 pytest build pytest_asyncio pytest-mock python -m build --wheel pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu pip install dist/*.whl diff --git a/tests/test_yuanrong_storage_manager.py b/tests/test_yuanrong_storage_manager.py index 141d9dc..4c38b7c 100644 --- a/tests/test_yuanrong_storage_manager.py +++ b/tests/test_yuanrong_storage_manager.py @@ -33,7 +33,7 @@ class MockBuffer: def __init__(self, size): self.data = bytearray(size) - def mutable_data(self): + def MutableData(self): return self.data @@ -69,15 +69,15 @@ def mock_deserialization(items): except UnicodeDecodeError: return data - mocker.patch("transfer_queue.storage.clients.yuanrong_client.serialization", side_effect=mock_serialization) - mocker.patch("transfer_queue.storage.clients.yuanrong_client.deserialization", side_effect=mock_deserialization) + mocker.patch("transfer_queue.storage.clients.yuanrong_client._encoder.encode", side_effect=mock_serialization) + mocker.patch("transfer_queue.storage.clients.yuanrong_client._decoder.decode", side_effect=mock_deserialization) stored_raw_buffers = [] def side_effect_mcreate(keys, sizes): buffers = [MockBuffer(size) for size in sizes] for b in buffers: - stored_raw_buffers.append(b.mutable_data()) + stored_raw_buffers.append(b.MutableData()) return 0, buffers storage_client._cpu_ds_client.mcreate.side_effect = side_effect_mcreate diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 5fa5284..c233472 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -185,6 +185,12 @@ def _create_empty_npu_tensorlist(self, shapes, dtypes): return tensors def mset_zcopy(self, keys: list[str], objs: list[Any]): + """Store multiple objects in zero-copy mode using parallel serialization and buffer packing. + + Args: + keys (list[str]): List of string keys under which the objects will be stored. + objs (list[Any]): List of Python objects to store (e.g., tensors, strings). + """ items_list = [[memoryview(b) for b in _encoder.encode(obj)] for obj in objs] packed_sizes = [calc_packed_size(items) for items in items_list] status, buffers = self._cpu_ds_client.mcreate(keys, packed_sizes) @@ -194,6 +200,14 @@ def mset_zcopy(self, keys: list[str], objs: list[Any]): self._cpu_ds_client.mset_buffer(buffers) def mget_zcopy(self, keys: list[str]) -> list[Any]: + """Retrieve multiple objects in zero-copy mode by directly deserializing from shared memory buffers. + + Args: + keys (list[str]): List of string keys to retrieve from storage. + + Returns: + list[Any]: List of deserialized objects corresponding to the input keys. + """ status, buffers = self._cpu_ds_client.get_buffers(keys, timeout_ms=500) return [_decoder.decode(unpack_from(buffer)) if buffer is not None else None for buffer in buffers] From 6f340795e534f7714af241c9153423c8259c69b4 Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Fri, 6 Feb 2026 15:08:16 +0800 Subject: [PATCH 2/5] add ray_storage_manager to backend Signed-off-by: Evelynn-V --- transfer_queue/storage/__init__.py | 2 + .../storage/managers/ray_storage_manager.py | 39 +++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 transfer_queue/storage/managers/ray_storage_manager.py diff --git a/transfer_queue/storage/__init__.py b/transfer_queue/storage/__init__.py index bac9a4c..cdef95b 100644 --- a/transfer_queue/storage/__init__.py +++ b/transfer_queue/storage/__init__.py @@ -19,6 +19,7 @@ TransferQueueStorageManager, TransferQueueStorageManagerFactory, YuanrongStorageManager, + RayStorageManager, ) from .simple_backend import SimpleStorageUnit, StorageMetaGroup, StorageUnitData @@ -31,4 +32,5 @@ "AsyncSimpleStorageManager", "MooncakeStorageManager", "YuanrongStorageManager", + "RayStorageManager", ] diff --git a/transfer_queue/storage/managers/ray_storage_manager.py b/transfer_queue/storage/managers/ray_storage_manager.py new file mode 100644 index 0000000..999f0e2 --- /dev/null +++ b/transfer_queue/storage/managers/ray_storage_manager.py @@ -0,0 +1,39 @@ +# Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# Copyright 2025 The TransferQueue Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +from typing import Any + +from transfer_queue.storage.managers.base import KVStorageManager +from transfer_queue.storage.managers.factory import TransferQueueStorageManagerFactory +from transfer_queue.utils.zmq_utils import ZMQServerInfo + +logger = logging.getLogger(__name__) +logger.setLevel(os.getenv("TQ_LOGGING_LEVEL", logging.WARNING)) + +@TransferQueueStorageManagerFactory.register("RayStore") +class RayStorageManager(KVStorageManager): + """Storage manager for Ray-RDT backend.""" + + def __init__(self, controller_info: ZMQServerInfo, config: dict[str, Any]): + client_name = config.get("client_name", None) + + if client_name is None: + logger.info("Missing 'client_name' in config, using default value('RayStorageClient')") + config["client_name"] = "RayStorageClient" + elif client_name != "RayStorageClient": + raise ValueError(f"Invalid 'client_name': {client_name} in config. Expecting 'RayStorageClient'") + super().__init__(controller_info, config) From ab7b406d8bc9b6664781611d1d21d8db1fdb765e Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Fri, 6 Feb 2026 15:26:14 +0800 Subject: [PATCH 3/5] fix the ci Signed-off-by: Evelynn-V --- transfer_queue/storage/managers/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/transfer_queue/storage/managers/__init__.py b/transfer_queue/storage/managers/__init__.py index 9702bb1..48d4073 100644 --- a/transfer_queue/storage/managers/__init__.py +++ b/transfer_queue/storage/managers/__init__.py @@ -18,6 +18,7 @@ from .mooncake_manager import MooncakeStorageManager from .simple_backend_manager import AsyncSimpleStorageManager from .yuanrong_manager import YuanrongStorageManager +from .ray_storage_manager import RayStorageManager __all__ = [ "TransferQueueStorageManager", @@ -25,4 +26,5 @@ "AsyncSimpleStorageManager", "YuanrongStorageManager", "MooncakeStorageManager", + "RayStorageManager", ] From f1e3c9658ad2aa0e1519ee03bbd5189e5536e71b Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Fri, 6 Feb 2026 15:44:06 +0800 Subject: [PATCH 4/5] fix pre-commit Signed-off-by: Evelynn-V --- transfer_queue/storage/__init__.py | 2 +- transfer_queue/storage/managers/__init__.py | 2 +- transfer_queue/storage/managers/ray_storage_manager.py | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/transfer_queue/storage/__init__.py b/transfer_queue/storage/__init__.py index cdef95b..04b0745 100644 --- a/transfer_queue/storage/__init__.py +++ b/transfer_queue/storage/__init__.py @@ -16,10 +16,10 @@ from .managers import ( AsyncSimpleStorageManager, MooncakeStorageManager, + RayStorageManager, TransferQueueStorageManager, TransferQueueStorageManagerFactory, YuanrongStorageManager, - RayStorageManager, ) from .simple_backend import SimpleStorageUnit, StorageMetaGroup, StorageUnitData diff --git a/transfer_queue/storage/managers/__init__.py b/transfer_queue/storage/managers/__init__.py index 48d4073..77954bb 100644 --- a/transfer_queue/storage/managers/__init__.py +++ b/transfer_queue/storage/managers/__init__.py @@ -16,9 +16,9 @@ from .base import TransferQueueStorageManager from .factory import TransferQueueStorageManagerFactory from .mooncake_manager import MooncakeStorageManager +from .ray_storage_manager import RayStorageManager from .simple_backend_manager import AsyncSimpleStorageManager from .yuanrong_manager import YuanrongStorageManager -from .ray_storage_manager import RayStorageManager __all__ = [ "TransferQueueStorageManager", diff --git a/transfer_queue/storage/managers/ray_storage_manager.py b/transfer_queue/storage/managers/ray_storage_manager.py index 999f0e2..79d37f3 100644 --- a/transfer_queue/storage/managers/ray_storage_manager.py +++ b/transfer_queue/storage/managers/ray_storage_manager.py @@ -24,6 +24,7 @@ logger = logging.getLogger(__name__) logger.setLevel(os.getenv("TQ_LOGGING_LEVEL", logging.WARNING)) + @TransferQueueStorageManagerFactory.register("RayStore") class RayStorageManager(KVStorageManager): """Storage manager for Ray-RDT backend.""" From 0261eeddef39d5d6de79a1bfe08325e309f48f07 Mon Sep 17 00:00:00 2001 From: Evelynn-V Date: Fri, 6 Feb 2026 16:31:11 +0800 Subject: [PATCH 5/5] add RDT to config.yaml Signed-off-by: Evelynn-V --- transfer_queue/config.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index 9503afa..98983e5 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -24,6 +24,9 @@ backend: # ZMQ Server IP & Ports (automatically generated during init) zmq_info: null + RayStore: + client_name: RayStorageClient + # For Yuanrong: # TODO