Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0e87f97
add kv interface
0oshowero0 Feb 6, 2026
4836b48
todo: fix ut
0oshowero0 Feb 6, 2026
b02e648
Merge branch 'upstream-main' into kv_interface
0oshowero0 Feb 6, 2026
c1cb09f
try add tutorial
0oshowero0 Feb 7, 2026
c53e5fa
provide basic runnable tutorial
0oshowero0 Feb 7, 2026
85567f3
fix other tutorials
0oshowero0 Feb 7, 2026
063b6b1
update tutorial
0oshowero0 Feb 7, 2026
1a86e20
fix
0oshowero0 Feb 7, 2026
e1f6ecf
fix cornercase
0oshowero0 Feb 7, 2026
f1c31ee
minor fix
0oshowero0 Feb 7, 2026
3ecf4d3
add ut
0oshowero0 Feb 7, 2026
4d479ab
try fix tutorial ci
0oshowero0 Feb 7, 2026
5556278
try only run 06
0oshowero0 Feb 7, 2026
4f6ce68
try add sleep in tutorial
0oshowero0 Feb 7, 2026
e7ea8e2
fix race condition
0oshowero0 Feb 7, 2026
29f5a1e
fix race condition
0oshowero0 Feb 7, 2026
e161571
recover ci
0oshowero0 Feb 7, 2026
928edda
fix batchmeta deserial
0oshowero0 Feb 8, 2026
ac97795
update kv interface test
0oshowero0 Feb 8, 2026
99b7fc9
update readme
0oshowero0 Feb 8, 2026
8e80d6d
fix comments
0oshowero0 Feb 8, 2026
b6b7d4b
fix comment
0oshowero0 Feb 8, 2026
d3dd014
fix more minor comments
0oshowero0 Feb 8, 2026
1eda4e9
fix comments and add FIXME comments
0oshowero0 Feb 8, 2026
620edfe
improve kv_list
0oshowero0 Feb 9, 2026
3678eac
provide KVBatchMeta
0oshowero0 Feb 9, 2026
42a78cf
add KVBatchMeta tests
0oshowero0 Feb 9, 2026
b301c74
fix comments
0oshowero0 Feb 9, 2026
f53cad9
change default value to None in KVBatchMeta
0oshowero0 Feb 9, 2026
f9a9830
update
0oshowero0 Feb 9, 2026
f49741d
change function order
0oshowero0 Feb 9, 2026
1bac20f
simplify interface
0oshowero0 Feb 9, 2026
3830bcf
fix ut
0oshowero0 Feb 9, 2026
5ec124b
fix tutorial
0oshowero0 Feb 9, 2026
e9e5872
fix pre-commit
0oshowero0 Feb 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/tutorial-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ jobs:
- name: Run tutorials
run: |
export TQ_NUM_THREADS=2
export RAY_DEDUP_LOGS=0
for file in tutorial/*.py; do python3 "$file"; done
79 changes: 47 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ TransferQueue offers **fine-grained, sub-sample-level** data management and **lo

<h2 id="updates">🔄 Updates</h2>

- **Jan 28, 2026**: We experimentally introduce `StreamingDataloader` interface for fully-streamed production-consumption pipeline. Refer to our [tutorials/05_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_streaming_dataloader.py) for details.
- **Feb 8, 2026**: 🔥 The initialization and usage is greatly simplified by high-level APIs [PR#26](https://github.com/Ascend/TransferQueue/pull/26), [PR#28](https://github.com/Ascend/TransferQueue/pull/28). You can now use a Redis-style API to take advantage of most of the advanced features provided by TransferQueue!
- **Jan 28, 2026**: We experimentally introduce `StreamingDataLoader` interface for fully-streamed production-consumption pipeline. Refer to our [tutorials/06_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/06_streaming_dataloader.py) for details.
- **Dec 30, 2025**: **TransferQueue x verl** integration is tested with the DAPO algorithm at scale **(64 nodes, 1024 cards)**. It significantly optimizes host memory utilization and accelerates data transfers. Stay tuned for more details!
- **Dec 20, 2025**: 🔥 The official [tutorial](https://github.com/Ascend/TransferQueue/tree/main/tutorial) is released! Feel free to check it out.
- **Nov 10, 2025**: We disentangle the data retrieval logic from TransferQueueController [PR#101](https://github.com/TransferQueue/TransferQueue/pull/101). Now you can implement your own `Sampler` to control how to consume the data.
Expand Down Expand Up @@ -91,26 +92,55 @@ This data structure design is motivated by the computational characteristics of
<img src="https://github.com/TransferQueue/community_doc/blob/main/docs/data_plane.png?raw=true" width="70%">
</p>

### User Interface: Asynchronous & Synchronous Client
To simplify the usage of TransferQueue, we have encapsulated this process into `TransferQueueClient`. The client provides both asynchronous and synchronous interfaces for data transfer, allowing users to easily integrate TransferQueue into their framework.
### User Interface: High-Level & Low-Level APIs

We also experimentally provide a `StreamingDataLoader` interface as a standard PyTorch DataLoader. Leveraging this abstraction, each rank can automatically get its own data like `DataLoader` in PyTorch. The TransferQueue system will handle the underlying data scheduling and transfer logic caused by different parallelism strategies, significantly simplifying the design of disaggregated frameworks.
This interface simplifies TransferQueue's integration, ensuring seamless compatibility with existing training workflows. Please refer to our [Roadmap](https://github.com/Ascend/TransferQueue/issues/1) and [tutorials/05_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_streaming_dataloader.py) for more details.
| Level | Tier | Style | Fine-Grained Access | Streaming | Sampler | Multiple-Backends |
|---|---|---|---|------------------|---|---|
| High | **KV Interface** (this PR) | Put/Get/List/Clear | ✓ | ○ | ✗ | ✓ |
| High | **StreamingDataLoader** (#23) | PyTorch DataLoader | ✓ | ✓ | ✓ | ✓ |
| Low | **TransferQueueClient** | Metadata-based | ✓ | ✓ | ✓ | ✓ |

<h2 id="show-cases">🔥 Showcases</h2>

### General Usage
#### Key-Value based API

To simplify the usage of TransferQueue, we have provided a Redis-style high-level API that can enjoy most of the advanced features provided by TransferQueue ([#PR28](https://github.com/Ascend/TransferQueue/pull/28)).

**Methods**

- **(async_)kv_put**: Insert/Update a multi-column sample by key, with optional metadata tag
- **(async_)kv_batch_put**: Put multiple key-value pairs efficiently in batch
- **(async_)kv_batch_get**: Retrieve samples (by keys), supporting column selection (by fields)
- **(async_)kv_list**: List keys and tags (metadata) in a partition
- **(async_)kv_clear**: Remove key-value pairs from storage

**Key Features**

- **Redis-style Semantics**: Familiar KV interface (Put/Get/List) for zero learning curve
- **Fine-grained Access**: Update or retrieve specific fields (columns) within a key (row) without full op.
- **Partition Isolation**: Logical separation of storage namespaces
- **Metadata Tags**: Lightweight metadata for status tracking
- **Pluggable Backends**: Supports multiple backends

#### StreamingDataLoader API

The primary interaction points are `AsyncTransferQueueClient` and `TransferQueueClient`, serving as the communication interface with the TransferQueue system.
Designed as a drop-in replacement for the standard PyTorch `DataLoader`, this API allows each rank to automatically consume data without single-controller intervention.

Core interfaces:
In this scenario, `TransferQueueController` serves as a side-controller for data dispatching, with user-defined `Sampler` class to organize dataflow.
It encapsulates the complex scheduling and data transfer logic required for various parallelism strategies, seamlessly integrating TransferQueue into existing training workflows and simplifying the development of disaggregated frameworks.

- `(async_)get_meta(data_fields: list[str], batch_size:int, partition_id: str, mode: str, task_name:str, sampling_config: Optional[dict[str, Any]]) -> BatchMeta`
- `(async_)get_data(metadata: BatchMeta) -> TensorDict`
- `(async_)put(data: TensorDict, metadata: Optional[BatchMeta], partition_id: Optional[str])`
- `(async_)clear_partition(partition_id: str)` and `(async_)clear_samples(metadata: BatchMeta)`
See [Roadmap](https://github.com/Ascend/TransferQueue/issues/1) and [tutorials/06_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/06_streaming_dataloader.py) for more details.

<span style="color: #FF0000;">**Refer to our [tutorial](https://github.com/Ascend/TransferQueue/tree/main/tutorial) for detailed examples.**</span>
#### Low-Level Native API

The native interface of TransferQueue are implemented in `TransferQueueClient`. It offers maximum flexibility through native, atomic operations.

Developers can leverage `TransferQueueClient` directly to implement advanced features that require fine-grained control and fully streamed data scheduling, as illustrated in the following tutorials:
- [tutorial/03_metadata_concepts.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/03_metadata_concepts.py)
- [tutorial/04_understanding_controller.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/04_understanding_controller.py)
- [tutorial/05_custom_sampler.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_custom_sampler.py)


<h2 id="show-cases">🔥 Showcases</h2>

### Collocated Example

Expand All @@ -131,7 +161,7 @@ You may refer to the [recipe](https://github.com/Ascend/TransferQueue/tree/dev/r

### Disaggregated Example

We have implemented a series of PRs ([#4](https://github.com/Ascend/TransferQueue/pull/4), [#7](https://github.com/Ascend/TransferQueue/pull/7), [#9](https://github.com/Ascend/TransferQueue/pull/9), [#16](https://github.com/Ascend/TransferQueue/pull/16)) to establish a **standardized, fully-streamed distributed** workflow via TransferQueue.
We have experimentally implemented a **standardized, fully-streamed distributed** workflow via TransferQueue.

By leveraging the `RankAwareSampler` and `StreamingDataLoader` interfaces, we achieve a **streamlined micro-batch-level producer-consumer pipeline**. This design eliminates the need to manually determine data dispatching logic across varying parallelism strategies—a typical complexity in the single-controller paradigm—thereby greatly simplifying framework design.

Expand Down Expand Up @@ -186,7 +216,7 @@ pip install TransferQueue
<img src="https://github.com/TransferQueue/community_doc/blob/main/docs/performance_0.1.1.dev2.png?raw=true" width="100%">
</p>

> Note: The above benchmark for TransferQueue is based on our naive `SimpleStorageUnit` backend. By introducing high-performance storage backends and optimizing serialization/deserialization, we expect to achieve even better performance. Warmly welcome contributions from the community!
> Note: The above benchmark for TransferQueue is based on our naive `SimpleStorage` backend. By introducing high-performance storage backends and optimizing serialization/deserialization, we expect to achieve even better performance. Warmly welcome contributions from the community!

For detailed performance benchmarks, please refer to [this blog](https://www.yuque.com/haomingzi-lfse7/hlx5g0/tml8ke0zkgn6roey?singleDoc#).

Expand Down Expand Up @@ -250,7 +280,7 @@ batch_meta = client.get_meta(
)
```

<span style="color: #FF0000;">**Refer to [tutorial/04_custom_sampler.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/04_custom_sampler.py) for more details.**</span>
<span style="color: #FF0000;">**Refer to [tutorial/05_custom_sampler.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_custom_sampler.py) for more details.**</span>


### How to integrate a new storage backend
Expand Down Expand Up @@ -299,21 +329,6 @@ pip install pre-commit
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always
```

<h2 id="roadmap"> 🛣️ Roadmap</h2>

- [x] Support data rewrite for partial rollout & agentic post-training
- [x] Provide a general storage abstraction layer `TransferQueueStorageManager` to manage distributed storage units, which simplifies `Client` design and makes it possible to introduce different storage backends ([PR#66](https://github.com/TransferQueue/TransferQueue/pull/66), [issue#72](https://github.com/TransferQueue/TransferQueue/issues/72))
- [x] Implement `AsyncSimpleStorageManager` as the default storage backend based on the `TransferQueueStorageManager` abstraction
- [x] Provide a `KVStorageManager` to cover all the KV based storage backends ([PR#96](https://github.com/TransferQueue/TransferQueue/pull/96))
- [x] Support topic-based data partitioning to maintain train/val/test data simultaneously ([PR#98](https://github.com/TransferQueue/TransferQueue/pull/98))
- [x] Release the first stable version through PyPI
- [ ] Support disaggregated framework (each rank retrieves its own data without going through a centralized node)
- [ ] Provide a `StreamingDataLoader` interface for disaggregated framework
- [ ] Support load-balancing and dynamic batching
- [x] Support high-performance storage backends for RDMA transmission (e.g., [Mooncake Store](https://github.com/kvcache-ai/Mooncake), [Ray Direct Transport](https://docs.ray.io/en/master/ray-core/direct-transport.html)...)
- [x] High-performance serialization and deserialization
- [ ] More documentation, examples and tutorials

<h2 id="citation">📑 Citation</h2>
Please kindly cite our paper if you find this repo is useful:

Expand Down
Loading
Loading