diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index 9503afa..98983e5 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -24,6 +24,9 @@ backend: # ZMQ Server IP & Ports (automatically generated during init) zmq_info: null + RayStore: + client_name: RayStorageClient + # For Yuanrong: # TODO diff --git a/transfer_queue/storage/__init__.py b/transfer_queue/storage/__init__.py index bac9a4c..04b0745 100644 --- a/transfer_queue/storage/__init__.py +++ b/transfer_queue/storage/__init__.py @@ -16,6 +16,7 @@ from .managers import ( AsyncSimpleStorageManager, MooncakeStorageManager, + RayStorageManager, TransferQueueStorageManager, TransferQueueStorageManagerFactory, YuanrongStorageManager, @@ -31,4 +32,5 @@ "AsyncSimpleStorageManager", "MooncakeStorageManager", "YuanrongStorageManager", + "RayStorageManager", ] diff --git a/transfer_queue/storage/managers/__init__.py b/transfer_queue/storage/managers/__init__.py index 9702bb1..77954bb 100644 --- a/transfer_queue/storage/managers/__init__.py +++ b/transfer_queue/storage/managers/__init__.py @@ -16,6 +16,7 @@ from .base import TransferQueueStorageManager from .factory import TransferQueueStorageManagerFactory from .mooncake_manager import MooncakeStorageManager +from .ray_storage_manager import RayStorageManager from .simple_backend_manager import AsyncSimpleStorageManager from .yuanrong_manager import YuanrongStorageManager @@ -25,4 +26,5 @@ "AsyncSimpleStorageManager", "YuanrongStorageManager", "MooncakeStorageManager", + "RayStorageManager", ] diff --git a/transfer_queue/storage/managers/ray_storage_manager.py b/transfer_queue/storage/managers/ray_storage_manager.py new file mode 100644 index 0000000..79d37f3 --- /dev/null +++ b/transfer_queue/storage/managers/ray_storage_manager.py @@ -0,0 +1,40 @@ +# Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# Copyright 2025 The TransferQueue Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +from typing import Any + +from transfer_queue.storage.managers.base import KVStorageManager +from transfer_queue.storage.managers.factory import TransferQueueStorageManagerFactory +from transfer_queue.utils.zmq_utils import ZMQServerInfo + +logger = logging.getLogger(__name__) +logger.setLevel(os.getenv("TQ_LOGGING_LEVEL", logging.WARNING)) + + +@TransferQueueStorageManagerFactory.register("RayStore") +class RayStorageManager(KVStorageManager): + """Storage manager for Ray-RDT backend.""" + + def __init__(self, controller_info: ZMQServerInfo, config: dict[str, Any]): + client_name = config.get("client_name", None) + + if client_name is None: + logger.info("Missing 'client_name' in config, using default value('RayStorageClient')") + config["client_name"] = "RayStorageClient" + elif client_name != "RayStorageClient": + raise ValueError(f"Invalid 'client_name': {client_name} in config. Expecting 'RayStorageClient'") + super().__init__(controller_info, config)