[perf] Add zmq.proxy to accelerate request processing for SimpleStorageUnit#37
[perf] Add zmq.proxy to accelerate request processing for SimpleStorageUnit#370oshowero0 merged 9 commits intoAscend:mainfrom
zmq.proxy to accelerate request processing for SimpleStorageUnit#37Conversation
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Pull request overview
This pull request refactors SimpleStorageUnit from a single-threaded architecture to a multi-threaded design using ZMQ proxy and worker thread pools. The change aims to improve performance by allowing concurrent request processing, eliminating bottlenecks from sequential message handling.
Changes:
- Introduced a native
zmq.proxyto load-balance between a frontend ROUTER socket and backend DEALER socket - Added a worker thread pool where each worker processes PUT/GET/CLEAR requests concurrently
- Added thread synchronization primitives (Lock, Event) to
StorageUnitDataandSimpleStorageUnitfor race condition prevention - Added
num_worker_threadsconfiguration parameter (default: 4) to control worker pool size
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| transfer_queue/storage/simple_backend.py | Core refactoring: replaced single-threaded event loop with zmq.proxy and worker pool; added locks and shutdown mechanisms; updated message handling for multi-threaded routing |
| transfer_queue/interface.py | Added num_worker_threads parameter extraction from config and passes it to SimpleStorageUnit initialization |
| transfer_queue/config.yaml | Added num_worker_threads configuration option with default value of 4 |
Comments suppressed due to low confidence (2)
transfer_queue/storage/simple_backend.py:106
- The
get_datamethod lacks thread safety protection. While it reads fromself.field_data, concurrent writes fromput_data(line 131) or modifications fromclearcould cause race conditions, leading to inconsistent reads or crashes. The lock should be acquired before accessingself.field_datato ensure thread-safe reads in the multi-threaded environment.
def get_data(self, fields: list[str], local_indexes: list[int]) -> dict[str, list]:
"""
Get data from storage unit according to given fields and local_indexes.
Args:
fields: Field names used for getting data.
local_indexes: Local indexes used for getting data.
Returns:
dict with field names as keys, corresponding data list as values.
"""
result: dict[str, list] = {}
for field in fields:
# Validate field name
if field not in self.field_data:
raise ValueError(
f"StorageUnitData get_data operation receive invalid field: {field} beyond {self.field_data.keys()}"
)
if len(local_indexes) == 1:
gathered_item = self.field_data[field][local_indexes[0]]
result[field] = [gathered_item]
else:
gathered_items = list(itemgetter(*local_indexes)(self.field_data[field]))
result[field] = gathered_items
return result
transfer_queue/storage/simple_backend.py:131
- The double-checked locking pattern used here is not thread-safe in Python. Between the outer check (line 119) and acquiring the lock (line 120), another thread could initialize the field, but more critically, the writes to list elements at line 131 are not protected by any lock. This creates race conditions where concurrent threads can simultaneously write to the same index. The entire write operation after field initialization should be protected by the lock, or use a different synchronization approach.
for f, values in field_data.items():
# Double-checked locking for field initialization
if f not in self.field_data:
with self._lock:
if f not in self.field_data:
self.field_data[f] = [None] * self.storage_size
for i, idx in enumerate(local_indexes):
if idx < 0 or idx >= self.storage_size:
raise ValueError(
f"StorageUnitData put_data operation receive invalid local_index: {idx} beyond "
f"storage_size: {self.storage_size}"
)
self.field_data[f][idx] = values[i]
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def _start_process_put_get(self) -> None: | ||
| """Create a daemon thread and start put/get process.""" | ||
| self.process_put_get_thread = Thread( | ||
| target=self._process_put_get, name=f"StorageUnitProcessPutGetThread-{self.storage_unit_id}", daemon=True | ||
| """Start worker threads and ZMQ proxy for handling requests.""" | ||
|
|
||
| # Start worker threads | ||
| for i in range(self.num_workers): | ||
| worker_thread = Thread( | ||
| target=self._worker_routine, | ||
| args=(i,), | ||
| name=f"StorageUnitWorkerThread-{self.storage_unit_id}-{i}", | ||
| daemon=True, | ||
| ) | ||
| worker_thread.start() | ||
| self.worker_threads.append(worker_thread) |
There was a problem hiding this comment.
The multi-threaded worker pool is a significant architectural change that introduces concurrency concerns, but there are no tests that explicitly verify thread-safety or concurrent access patterns. The existing test test_put_get_multiple_clients tests multiple clients sequentially, not truly concurrent operations. Consider adding tests that use threading or asyncio to send simultaneous PUT/GET/CLEAR operations to verify thread-safety, race condition handling, and that the worker pool correctly processes concurrent requests without data corruption.
| self._finalizer = weakref.finalize( | ||
| self, | ||
| self._shutdown_resources, | ||
| self._shutdown_event, | ||
| self.worker_threads, | ||
| self.proxy_thread, | ||
| self.zmq_context, | ||
| ) |
There was a problem hiding this comment.
The finalizer is registered before the threads and zmq_context are fully initialized. At this point (line 206), self.worker_threads is an empty list, self.proxy_thread is None, and self.zmq_context is None. These values are captured by the finalizer at registration time, not at cleanup time. When garbage collection occurs, the finalizer will attempt to shut down the wrong (empty/None) references instead of the actual running threads and context. The finalizer should be registered after _init_zmq_socket() and _start_process_put_get() complete, or it should pass self and access attributes dynamically.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
| if f not in self.field_data: | ||
| self.field_data[f] = [None] * self.storage_size | ||
| with self._lock: | ||
| if f not in self.field_data: | ||
| self.field_data[f] = [None] * self.storage_size |
There was a problem hiding this comment.
Why did we use double-check? To avoid overhead of locking?
There was a problem hiding this comment.
There may be multiple threads go into the outer if case and try to acquire lock
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Comments suppressed due to low confidence (2)
transfer_queue/storage/simple_backend.py:131
- The double-checked locking pattern for field initialization is not sufficient to prevent race conditions on the actual data writes at line 131. After the field is initialized (lines 118-122), multiple threads can concurrently write to
self.field_data[f][idx]without synchronization. This can lead to:
- Lost updates: If two threads write to the same index simultaneously, one write may be lost.
- Data corruption: Depending on Python's GIL behavior and the data types involved (especially with complex objects like tensors), concurrent writes could corrupt data.
While the GIL provides some protection for simple operations, it's not guaranteed for all scenarios, especially with C extensions (like PyTorch tensors). Consider protecting the entire write operation (lines 124-131) with a lock, or using finer-grained locks per field or index range.
for i, idx in enumerate(local_indexes):
if idx < 0 or idx >= self.storage_size:
raise ValueError(
f"StorageUnitData put_data operation receive invalid local_index: {idx} beyond "
f"storage_size: {self.storage_size}"
)
self.field_data[f][idx] = values[i]
transfer_queue/storage/simple_backend.py:106
- The
get_datamethod is not thread-safe in a multi-threaded environment. Whileput_dataandclearuse locks for write operations,get_dataperforms reads without any synchronization. This can lead to race conditions where:
- A worker thread reads from
self.field_data[field]while another thread is modifying the dictionary structure input_data(field initialization) orclear. - The check
if field not in self.field_dataat line 92 followed by access at line 98/102 is not atomic, so a concurrentclearoperation could modify the dictionary between the check and access. - The
itemgetteroperation at line 102 could read partially updated data if another thread is writing to the same indexes.
Consider acquiring a read lock or using a readers-writer lock pattern to protect read operations, especially around dictionary access and list indexing.
def get_data(self, fields: list[str], local_indexes: list[int]) -> dict[str, list]:
"""
Get data from storage unit according to given fields and local_indexes.
Args:
fields: Field names used for getting data.
local_indexes: Local indexes used for getting data.
Returns:
dict with field names as keys, corresponding data list as values.
"""
result: dict[str, list] = {}
for field in fields:
# Validate field name
if field not in self.field_data:
raise ValueError(
f"StorageUnitData get_data operation receive invalid field: {field} beyond {self.field_data.keys()}"
)
if len(local_indexes) == 1:
gathered_item = self.field_data[field][local_indexes[0]]
result[field] = [gathered_item]
else:
gathered_items = list(itemgetter(*local_indexes)(self.field_data[field]))
result[field] = gathered_items
return result
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| data_fields = [] | ||
| for fname, col_idx in partition.field_name_mapping.items(): | ||
| if col_mask[col_idx]: | ||
| if col_idx < len(col_mask) and col_mask[col_idx]: |
There was a problem hiding this comment.
This change adds a bounds check to prevent an IndexError when col_idx is out of range for col_mask. While this is a good defensive fix, it appears to be unrelated to the multi-threading changes in this PR.
Consider:
- Moving this fix to a separate PR for easier tracking and review.
- Adding a comment explaining under what conditions
col_idxmight exceedlen(col_mask), as this could indicate a data consistency issue elsewhere in the code. - Adding a warning log when this condition is detected to help identify the root cause.
| # Backend: DEALER for worker communication (connected via zmq.proxy) | ||
| self.worker_socket = create_zmq_socket(self.zmq_context, zmq.DEALER) | ||
| self.worker_socket.bind(self._inproc_addr) |
There was a problem hiding this comment.
The worker socket binding happens in _init_zmq_socket (line 235), but worker threads connect to this address in _worker_routine (line 283). There's a potential race condition where worker threads might try to connect before the backend socket is fully bound and ready.
While there's a retry mechanism in the bind operation (lines 225-231), adding a small delay or verification after the bind at line 235 would ensure the socket is ready before starting worker threads. Alternatively, consider binding the backend socket before starting any threads, or add connection retry logic in the worker threads.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
SimpleStorageUnitzmq.proxy to accelerate request processing for SimpleStorageUnit
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
| if proxy_thread and proxy_thread.is_alive(): | ||
| proxy_thread.join(timeout=5) | ||
|
|
||
| # Terminate ZMQ context to unblock proxy and workers | ||
| if zmq_context: | ||
| zmq_context.term() |
There was a problem hiding this comment.
proxy_thread doesnot check shuntdown_event proactively. Maybe we should close zmq_context first, if the proxy_thread depends on the zmq_context.
Btw, is it necessary for us to explicitly close these threads to ensure complete deallocation?
There was a problem hiding this comment.
Indeed it's not very necessary, but I believe the explicit destructor can help to make the log clearer when we press control + C..
There was a problem hiding this comment.
Another thing is the underlying C++ codes can leverage these explicit term signal to properly exit, rather than blocking the terminal
There was a problem hiding this comment.
Yes, I am concerned that merely using a semaphore shutdown_event to exit threads may not be thorough enough.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
CLA Signature Pass0oshowero0, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Background
Previously,
SimpleStorageUnitrelied on a single-threaded event loop for request processing. This design could lead to bottlenecks and increased latency when multiple requests arrived simultaneously, as operations like ZMQ message deserialization and memory I/O would block the main socket loop from receiving new requests.Key Changes
SimpleStorageUnitto utilize a nativezmq.proxy. This acts as a highly efficient, C-level load balancer between a frontendROUTERsocket (handling external client connections) and an internal backendDEALERsocket (inproc://).Introduced a worker thread pool where each worker binds its own independentDEALERsocket to processPUT/GET/CLEARrequests concurrently. This preserves ZMQ's "share-nothing" concurrency philosophy.Added athreading.Lock()toStorageUnitDatato prevent race condition introduced by multi-threadsAddednum_worker_threadsas an explicit input parameter forSimpleStorageUnit(configurable via TQ system config items).Architechture
Old Version
New Version
Performance Gain
We provide a simple benchmark script for this PR:
Small Scale Test (
batch_size=20,clients=4)On a mac mini with M2 chip with 24GB memory:
Old Version
New Version
Middle Scale Test (
batch_size=256,clients=4)On a mac mini with M2 chip with 24GB memory:
Old Version
New Version
Large Scale Test (
batch_size=256,clients=50)On a Ubuntu server with Intel(R) Xeon(R) Platinum 8358P CPU @ 2.60GHz x 128 cores:
Note:
getperformanceOld Version
New Version