Skip to content

[feat] Add RayStorageManager to backend#27

Open
Evelynn-V wants to merge 6 commits intoAscend:mainfrom
Evelynn-V:RDT_backend
Open

[feat] Add RayStorageManager to backend#27
Evelynn-V wants to merge 6 commits intoAscend:mainfrom
Evelynn-V:RDT_backend

Conversation

@Evelynn-V
Copy link
Contributor

@Evelynn-V Evelynn-V commented Feb 6, 2026

Background

In order to align with the backend mentioned in the PR#26, I have extracted the RayStorageManager to manage the RDT backend

Use Case

import ray
import time
import torch
import sys
from pathlib import Path
from omegaconf import OmegaConf
from tensordict import TensorDict
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

parent_dir = Path(__file__).resolve().parent.parent
sys.path.append(str(parent_dir))

import transfer_queue as tq
from transfer_queue.metadata import BatchMeta


def tensordict_memory_mb(td):
    total_bytes = sum(tensor.element_size() * tensor.numel() for tensor in td.values())
    return total_bytes / (1024 * 1024)


@ray.remote
class WriterActor:
    def __init__(self):
        tq.init()
        self.data = None

    def generate_data(self, batch_size: int = 10000, seq_len: int = 10000):
        self.data = TensorDict({
            "input_ids": torch.randn(batch_size, seq_len, dtype=torch.float32),
        }, batch_size=batch_size)

        size = tensordict_memory_mb(self.data)
        print(f"[Writer] Data generated. Memory usage: {size:.2f} MB")

    def put_once(self, partition_id):
        t0 = time.time()
        batch_meta = tq.put(data=self.data, partition_id=partition_id)
        t1 = time.time()
        return t1 - t0, batch_meta


@ray.remote
class ReaderActor:
    def __init__(self):
        tq.init()

    def get_once(self, metadata: BatchMeta):
        t0 = time.perf_counter()
        result = tq.get_data(metadata)
        t1 = time.perf_counter()
        return t1 - t0


def main():
    if not ray.is_initialized():
        ray.init(address="auto")
    print("Initialize TransferQueue System...")
    tq.init()

    nodes = ray.nodes()
    ip_to_nodeid = {}
    for n in nodes:
        addr = n.get("NodeManagerAddress") or n.get("node_ip_address") or n.get("NodeIP")
        node_id = n["NodeID"] if "NodeID" in n else n.get("NodeID") or n.get("node_id")
        if addr and node_id:
            ip_to_nodeid[addr] = node_id

    ip_A = ""  # Writer 
    ip_B = ""  # Reader
    node_id_A = ip_to_nodeid.get(ip_A)
    node_id_B = ip_to_nodeid.get(ip_B)

    if not node_id_A or not node_id_B:
        print(f"Warning: Specific nodes not found. Available IPs: {list(ip_to_nodeid.keys())}")
        node_id_A = list(ip_to_nodeid.values())[0]
        node_id_B = list(ip_to_nodeid.values())[-1]

    writer = WriterActor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=node_id_A, soft=False),
    ).remote()

    reader = ReaderActor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=node_id_B, soft=False),
    ).remote()

    batch_size = 512
    seq_len = 32 * 1024

    partition_id = "train_step"
    ray.get(writer.generate_data.remote(batch_size, seq_len))
    cost_put, batch_meta = ray.get(writer.put_once.remote(partition_id))
    cost_get = ray.get(reader.get_once.remote(meta))

    tq.close()
    print("Test Finished.")

if __name__ == "__main__":
    main()

Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a Ray-backed storage manager so TransferQueue can use the Ray/RDT KV backend via a dedicated RayStorageManager, aligning with the backend naming/initialization approach introduced in PR #26.

Changes:

  • Introduces RayStorageManager registered as the RayStore backend type.
  • Exposes RayStorageManager through transfer_queue.storage and transfer_queue.storage.managers exports.
  • Adds a default backend.RayStore block to transfer_queue/config.yaml (with client_name: RayStorageClient).

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
transfer_queue/storage/managers/ray_storage_manager.py New storage manager wrapper for Ray KV backend; validates/defaults client_name and delegates to KVStorageManager.
transfer_queue/storage/managers/__init__.py Re-exports RayStorageManager from the managers package.
transfer_queue/storage/__init__.py Re-exports RayStorageManager from the storage package.
transfer_queue/config.yaml Adds default configuration section for backend.RayStore.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +27 to +29
RayStore:
client_name: RayStorageClient

Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a short inline comment/header for this new RayStore block (matching the existing "# For SimpleStorage" section), and also update the earlier backend.storage_backend comment list to include RayStore so users discover the new backend option from the default config.

Copilot uses AI. Check for mistakes.
Comment on lines +32 to +40
def __init__(self, controller_info: ZMQServerInfo, config: dict[str, Any]):
client_name = config.get("client_name", None)

if client_name is None:
logger.info("Missing 'client_name' in config, using default value('RayStorageClient')")
config["client_name"] = "RayStorageClient"
elif client_name != "RayStorageClient":
raise ValueError(f"Invalid 'client_name': {client_name} in config. Expecting 'RayStorageClient'")
super().__init__(controller_info, config)
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RayStorageManager introduces backend-specific config validation (defaulting/validating client_name) but there are no unit tests covering this behavior. Add a small test that constructs the manager (with controller connect mocked like other tests) to verify: (1) missing client_name is defaulted to RayStorageClient, and (2) a non-RayStorageClient value raises the expected ValueError.

Copilot uses AI. Check for mistakes.
@tianyi-ge
Copy link
Contributor

look good to merge

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants