From 4a483b7bd9838472aa4f20d2dc3a5fb5b55be8f9 Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Thu, 26 Feb 2026 16:12:09 +0800 Subject: [PATCH 1/9] support multi-thread SimpleStorageUnit Signed-off-by: 0oshowero0 --- transfer_queue/config.yaml | 2 + transfer_queue/interface.py | 6 +- transfer_queue/storage/simple_backend.py | 167 +++++++++++++++++++---- 3 files changed, 144 insertions(+), 31 deletions(-) diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index c0ddfe7..de18189 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -21,6 +21,8 @@ backend: total_storage_size: 100000 # Number of distributed storage units for SimpleStorage backend num_data_storage_units: 2 + # Number of worker threads in each SimpleStorageUnit + num_worker_threads: 4 # ZMQ Server IP & Ports (automatically generated during init) zmq_info: null diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index d415a2f..1bdb7b2 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -74,6 +74,7 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: # initialize SimpleStorageUnit num_data_storage_units = conf.backend.SimpleStorage.num_data_storage_units total_storage_size = conf.backend.SimpleStorage.total_storage_size + num_worker_threads = conf.backend.SimpleStorage.get("num_worker_threads", 4) storage_placement_group = get_placement_group(num_data_storage_units, num_cpus_per_actor=1) for storage_unit_rank in range(num_data_storage_units): @@ -82,7 +83,10 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: placement_group_bundle_index=storage_unit_rank, name=f"TransferQueueStorageUnit#{storage_unit_rank}", lifetime="detached", - ).remote(storage_unit_size=math.ceil(total_storage_size / num_data_storage_units)) + ).remote( + storage_unit_size=math.ceil(total_storage_size / num_data_storage_units), + num_worker_threads=num_worker_threads, + ) _TRANSFER_QUEUE_STORAGE[f"TransferQueueStorageUnit#{storage_unit_rank}"] = storage_node logger.info(f"TransferQueueStorageUnit#{storage_unit_rank} has been created.") diff --git a/transfer_queue/storage/simple_backend.py b/transfer_queue/storage/simple_backend.py index 37dcca4..300d12b 100644 --- a/transfer_queue/storage/simple_backend.py +++ b/transfer_queue/storage/simple_backend.py @@ -16,10 +16,11 @@ import dataclasses import logging import os +import weakref from dataclasses import dataclass from operator import itemgetter -from threading import Thread -from typing import Any +from threading import Event, Lock, Thread +from typing import Any, Optional from uuid import uuid4 import ray @@ -70,6 +71,9 @@ def __init__(self, storage_size: int): # Maximum number of elements stored in storage unit self.storage_size = storage_size + # Lock to prevent race condition + self._lock = Lock() + def get_data(self, fields: list[str], local_indexes: list[int]) -> dict[str, list]: """ Get data from storage unit according to given fields and local_indexes. @@ -111,8 +115,11 @@ def put_data(self, field_data: dict[str, Any], local_indexes: list[int]) -> None """ for f, values in field_data.items(): + # Double-checked locking for field initialization if f not in self.field_data: - self.field_data[f] = [None] * self.storage_size + with self._lock: + if f not in self.field_data: + self.field_data[f] = [None] * self.storage_size for i, idx in enumerate(local_indexes): if idx < 0 or idx >= self.storage_size: @@ -139,9 +146,10 @@ def clear(self, local_indexes: list[int]) -> None: ) # Clear data at specified local_indexes - for f in self.field_data: - for idx in local_indexes: - self.field_data[f][idx] = None + with self._lock: + for f in self.field_data: + for idx in local_indexes: + self.field_data[f][idx] = None @ray.remote(num_cpus=1) @@ -162,27 +170,57 @@ class SimpleStorageUnit: zmq_server_info: ZMQ connection information for clients. """ - def __init__(self, storage_unit_size: int): + def __init__(self, storage_unit_size: int, num_worker_threads: int = 4): """Initialize a SimpleStorageUnit with the specified size. Args: storage_unit_size: Maximum number of elements that can be stored in this storage unit. + num_worker_threads: Number of worker threads for handling requests. """ self.storage_unit_id = f"TQ_STORAGE_UNIT_{uuid4().hex[:8]}" self.storage_unit_size = storage_unit_size self.storage_data = StorageUnitData(self.storage_unit_size) + # Number of worker threads for handling requests + self.num_workers = num_worker_threads + + # Internal communication address for proxy and workers + self._inproc_addr = f"inproc://simple_storage_workers_{self.storage_unit_id}" + + # Lock to protect storage_data access across workers + self._data_lock = Lock() + + # Shutdown event for graceful termination + self._shutdown_event = Event() + + # Placeholder for proxy_thread and worker_threads + self.zmq_context: Optional[zmq.Context] = None + self.proxy_thread: Optional[Thread] = None + self.worker_threads: list[Thread] = [] + self._init_zmq_socket() self._start_process_put_get() + # Register finalizer for graceful cleanup when garbage collected + self._finalizer = weakref.finalize( + self, + self._shutdown_resources, + self._shutdown_event, + self.worker_threads, + self.proxy_thread, + self.zmq_context, + ) + def _init_zmq_socket(self) -> None: """ Initialize ZMQ socket connections between storage unit and controller/clients: - - put_get_socket: - Handle put/get requests from clients. + - put_get_socket (ROUTER): Handle put/get requests from clients. + - worker_socket (DEALER): Backend socket for worker communication. """ self.zmq_context = zmq.Context() + + # Frontend: ROUTER for receiving client requests self.put_get_socket = create_zmq_socket(self.zmq_context, zmq.ROUTER) self._node_ip = get_node_ip_address() @@ -195,6 +233,10 @@ def _init_zmq_socket(self) -> None: logger.warning(f"[{self.storage_unit_id}]: Try to bind ZMQ sockets failed, retrying...") continue + # Backend: DEALER for worker communication (connected via zmq.proxy) + self.worker_socket = create_zmq_socket(self.zmq_context, zmq.DEALER) + self.worker_socket.bind(self._inproc_addr) + self.zmq_server_info = ZMQServerInfo( role=TransferQueueRole.STORAGE, id=str(self.storage_unit_id), @@ -203,33 +245,71 @@ def _init_zmq_socket(self) -> None: ) def _start_process_put_get(self) -> None: - """Create a daemon thread and start put/get process.""" - self.process_put_get_thread = Thread( - target=self._process_put_get, name=f"StorageUnitProcessPutGetThread-{self.storage_unit_id}", daemon=True + """Start worker threads and ZMQ proxy for handling requests.""" + + # Start worker threads + for i in range(self.num_workers): + worker_thread = Thread( + target=self._worker_routine, + args=(i,), + name=f"StorageUnitWorkerThread-{self.storage_unit_id}-{i}", + daemon=True, + ) + worker_thread.start() + self.worker_threads.append(worker_thread) + + # Start proxy thread (ROUTER <-> DEALER) + self.proxy_thread = Thread( + target=self._proxy_routine, + name=f"StorageUnitProxyThread-{self.storage_unit_id}", + daemon=True, ) - self.process_put_get_thread.start() + self.proxy_thread.start() - def _process_put_get(self) -> None: - """Process put_get_socket request.""" - poller = zmq.Poller() - poller.register(self.put_get_socket, zmq.POLLIN) + def _proxy_routine(self) -> None: + """ZMQ proxy for message forwarding between frontend ROUTER and backend DEALER.""" + logger.info(f"[{self.storage_unit_id}]: start ZMQ proxy...") + try: + zmq.proxy(self.put_get_socket, self.worker_socket) + except zmq.ContextTerminated: + logger.info(f"[{self.storage_unit_id}]: ZMQ Proxy stopped gracefully (Context Terminated)") + except Exception as e: + if self._shutdown_event.is_set(): + logger.info(f"[{self.storage_unit_id}]: ZMQ Proxy shutting down...") + else: + logger.error(f"[{self.storage_unit_id}]: ZMQ Proxy unexpected error: {e}") - logger.info(f"[{self.storage_unit_id}]: start processing put/get requests...") + def _worker_routine(self, worker_id: int) -> None: + """Worker thread for processing requests.""" + # Each worker must have its own socket + worker_socket = create_zmq_socket(self.zmq_context, zmq.DEALER) + worker_socket.connect(self._inproc_addr) - perf_monitor = IntervalPerfMonitor(caller_name=self.storage_unit_id) + poller = zmq.Poller() + poller.register(worker_socket, zmq.POLLIN) - while True: + logger.info(f"[{self.storage_unit_id}]: worker {worker_id} started...") + perf_monitor = IntervalPerfMonitor(caller_name=f"{self.storage_unit_id}_worker_{worker_id}") + + while not self._shutdown_event.is_set(): socks = dict(poller.poll(TQ_STORAGE_POLLER_TIMEOUT * 1000)) - if self.put_get_socket in socks: - messages = self.put_get_socket.recv_multipart() - identity = messages.pop(0) - serialized_msg = messages + if self._shutdown_event.is_set(): + break + + if worker_socket in socks: + # Messages received from proxy: [identity, serialized_msg_frame1, ...] + messages = worker_socket.recv_multipart() + identity = messages[0] + serialized_msg = messages[1:] + request_msg = ZMQMessage.deserialize(serialized_msg) operation = request_msg.request_type + try: - logger.debug(f"[{self.storage_unit_id}]: receive operation: {operation}, message: {request_msg}") + logger.debug(f"[{self.storage_unit_id}]: worker {worker_id} received operation: {operation}") + # Process request if operation == ZMQRequestType.PUT_DATA: with perf_monitor.measure(op_type="PUT_DATA"): response_msg = self._handle_put(request_msg) @@ -252,13 +332,14 @@ def _process_put_get(self) -> None: response_msg = ZMQMessage.create( request_type=ZMQRequestType.PUT_GET_ERROR, sender_id=self.storage_unit_id, - body={ - "message": f"Storage unit id #{self.storage_unit_id} occur error in processing " - f"put/get/clear request, detail error message: {str(e)}." - }, + body={"message": f"Worker {worker_id} occur error: {str(e)}."}, ) - self.put_get_socket.send_multipart([identity, *response_msg.serialize()], copy=False) + # Send response back with identity for routing + worker_socket.send_multipart([identity] + response_msg.serialize(), copy=False) + + logger.info(f"[{self.storage_unit_id}]: worker {worker_id} stopped.") + worker_socket.close(linger=0) def _handle_put(self, data_parts: ZMQMessage) -> ZMQMessage: """ @@ -365,6 +446,32 @@ def _handle_clear(self, data_parts: ZMQMessage) -> ZMQMessage: ) return response_msg + @staticmethod + def _shutdown_resources( + shutdown_event: Event, + worker_threads: list[Thread], + proxy_thread: Optional[Thread], + zmq_context: Optional[zmq.Context], + ) -> None: + """Clean up resources on garbage collection.""" + logger.info("Shutting down SimpleStorageUnit resources...") + + # Signal all threads to stop + shutdown_event.set() + + # Terminate ZMQ context to unblock proxy and workers + if zmq_context: + zmq_context.term() + + # Wait for threads to finish (with timeout) + for thread in worker_threads: + if thread and thread.is_alive(): + thread.join(timeout=5) + if proxy_thread and proxy_thread.is_alive(): + proxy_thread.join(timeout=5) + + logger.info("SimpleStorageUnit resources shutdown complete.") + def get_zmq_server_info(self) -> ZMQServerInfo: """Get the ZMQ server information for this storage unit. From 8bcfc3312ba340d4ab24dceb66bd43c616680baa Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Thu, 26 Feb 2026 16:57:32 +0800 Subject: [PATCH 2/9] fix Signed-off-by: 0oshowero0 --- scripts/performance_test.py | 11 ++++------- transfer_queue/storage/simple_backend.py | 10 +++++++++- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/scripts/performance_test.py b/scripts/performance_test.py index 396ddb2..14d06a4 100644 --- a/scripts/performance_test.py +++ b/scripts/performance_test.py @@ -30,14 +30,11 @@ parent_dir = Path(__file__).resolve().parent.parent.parent sys.path.append(str(parent_dir)) - -from transfer_queue import ( # noqa: E402 - SimpleStorageUnit, - TransferQueueClient, - TransferQueueController, - process_zmq_server_info, -) +from transfer_queue.client import TransferQueueClient # noqa: E402 +from transfer_queue.controller import TransferQueueController # noqa: E402 +from transfer_queue.storage.simple_backend import SimpleStorageUnit # noqa: E402 from transfer_queue.utils.common import get_placement_group # noqa: E402 +from transfer_queue.utils.zmq_utils import process_zmq_server_info # noqa: E402 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) diff --git a/transfer_queue/storage/simple_backend.py b/transfer_queue/storage/simple_backend.py index 300d12b..6d85bb4 100644 --- a/transfer_queue/storage/simple_backend.py +++ b/transfer_queue/storage/simple_backend.py @@ -292,7 +292,15 @@ def _worker_routine(self, worker_id: int) -> None: perf_monitor = IntervalPerfMonitor(caller_name=f"{self.storage_unit_id}_worker_{worker_id}") while not self._shutdown_event.is_set(): - socks = dict(poller.poll(TQ_STORAGE_POLLER_TIMEOUT * 1000)) + try: + socks = dict(poller.poll(TQ_STORAGE_POLLER_TIMEOUT * 1000)) + except zmq.error.ContextTerminated: + # ZMQ context was terminated, exit gracefully + logger.info(f"[{self.storage_unit_id}]: worker {worker_id} stopped gracefully (Context Terminated)") + break + except Exception as e: + logger.warning(f"[{self.storage_unit_id}]: worker {worker_id} poll error: {e}") + continue if self._shutdown_event.is_set(): break From c12f590175634aa196b5ac637cc9b837a3f92aa5 Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Thu, 26 Feb 2026 17:02:15 +0800 Subject: [PATCH 3/9] fix review Signed-off-by: 0oshowero0 --- transfer_queue/storage/simple_backend.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/transfer_queue/storage/simple_backend.py b/transfer_queue/storage/simple_backend.py index 6d85bb4..3485f8f 100644 --- a/transfer_queue/storage/simple_backend.py +++ b/transfer_queue/storage/simple_backend.py @@ -188,13 +188,10 @@ def __init__(self, storage_unit_size: int, num_worker_threads: int = 4): # Internal communication address for proxy and workers self._inproc_addr = f"inproc://simple_storage_workers_{self.storage_unit_id}" - # Lock to protect storage_data access across workers - self._data_lock = Lock() - # Shutdown event for graceful termination self._shutdown_event = Event() - # Placeholder for proxy_thread and worker_threads + # Placeholder for zmq_context, proxy_thread and worker_threads self.zmq_context: Optional[zmq.Context] = None self.proxy_thread: Optional[Thread] = None self.worker_threads: list[Thread] = [] @@ -340,7 +337,10 @@ def _worker_routine(self, worker_id: int) -> None: response_msg = ZMQMessage.create( request_type=ZMQRequestType.PUT_GET_ERROR, sender_id=self.storage_unit_id, - body={"message": f"Worker {worker_id} occur error: {str(e)}."}, + body={ + "message": f"{self.storage_unit_id}, worker {worker_id} encountered error " + f"during operation {operation}: {str(e)}." + }, ) # Send response back with identity for routing From 7585917c84bbc4b9b8d9d7dcf6a8fe800ec35a28 Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Thu, 26 Feb 2026 17:54:44 +0800 Subject: [PATCH 4/9] fix potential issue in controller Signed-off-by: 0oshowero0 --- transfer_queue/controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transfer_queue/controller.py b/transfer_queue/controller.py index f7cd239..fdc0840 100644 --- a/transfer_queue/controller.py +++ b/transfer_queue/controller.py @@ -1523,7 +1523,7 @@ def kv_retrieve_keys( ) data_fields = [] for fname, col_idx in partition.field_name_mapping.items(): - if col_mask[col_idx]: + if col_idx < len(col_mask) and col_mask[col_idx]: data_fields.append(fname) metadata = self.generate_batch_meta(partition_id, verified_global_indexes, data_fields, mode="force_fetch") From 51445ed21bf77fe25c2ede9004f8837b45d6d1d8 Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Thu, 26 Feb 2026 19:52:36 +0800 Subject: [PATCH 5/9] modify default num_worker_threads Signed-off-by: 0oshowero0 --- transfer_queue/config.yaml | 2 +- transfer_queue/storage/simple_backend.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index de18189..5d0750b 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -22,7 +22,7 @@ backend: # Number of distributed storage units for SimpleStorage backend num_data_storage_units: 2 # Number of worker threads in each SimpleStorageUnit - num_worker_threads: 4 + num_worker_threads: 1 # ZMQ Server IP & Ports (automatically generated during init) zmq_info: null diff --git a/transfer_queue/storage/simple_backend.py b/transfer_queue/storage/simple_backend.py index 3485f8f..42f97ec 100644 --- a/transfer_queue/storage/simple_backend.py +++ b/transfer_queue/storage/simple_backend.py @@ -170,7 +170,7 @@ class SimpleStorageUnit: zmq_server_info: ZMQ connection information for clients. """ - def __init__(self, storage_unit_size: int, num_worker_threads: int = 4): + def __init__(self, storage_unit_size: int, num_worker_threads: int = 1): """Initialize a SimpleStorageUnit with the specified size. Args: From 9ad0c9e3766e1116c3c07dfda109d2c41bee8a48 Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Fri, 27 Feb 2026 09:59:44 +0800 Subject: [PATCH 6/9] retire multi-thread Signed-off-by: 0oshowero0 --- transfer_queue/config.yaml | 2 - transfer_queue/interface.py | 2 - transfer_queue/storage/simple_backend.py | 67 +++++++++--------------- 3 files changed, 26 insertions(+), 45 deletions(-) diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index 5d0750b..c0ddfe7 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -21,8 +21,6 @@ backend: total_storage_size: 100000 # Number of distributed storage units for SimpleStorage backend num_data_storage_units: 2 - # Number of worker threads in each SimpleStorageUnit - num_worker_threads: 1 # ZMQ Server IP & Ports (automatically generated during init) zmq_info: null diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 1bdb7b2..23d0bc9 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -74,7 +74,6 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: # initialize SimpleStorageUnit num_data_storage_units = conf.backend.SimpleStorage.num_data_storage_units total_storage_size = conf.backend.SimpleStorage.total_storage_size - num_worker_threads = conf.backend.SimpleStorage.get("num_worker_threads", 4) storage_placement_group = get_placement_group(num_data_storage_units, num_cpus_per_actor=1) for storage_unit_rank in range(num_data_storage_units): @@ -85,7 +84,6 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: lifetime="detached", ).remote( storage_unit_size=math.ceil(total_storage_size / num_data_storage_units), - num_worker_threads=num_worker_threads, ) _TRANSFER_QUEUE_STORAGE[f"TransferQueueStorageUnit#{storage_unit_rank}"] = storage_node logger.info(f"TransferQueueStorageUnit#{storage_unit_rank} has been created.") diff --git a/transfer_queue/storage/simple_backend.py b/transfer_queue/storage/simple_backend.py index 42f97ec..a6cdcfb 100644 --- a/transfer_queue/storage/simple_backend.py +++ b/transfer_queue/storage/simple_backend.py @@ -19,7 +19,7 @@ import weakref from dataclasses import dataclass from operator import itemgetter -from threading import Event, Lock, Thread +from threading import Event, Thread from typing import Any, Optional from uuid import uuid4 @@ -71,9 +71,6 @@ def __init__(self, storage_size: int): # Maximum number of elements stored in storage unit self.storage_size = storage_size - # Lock to prevent race condition - self._lock = Lock() - def get_data(self, fields: list[str], local_indexes: list[int]) -> dict[str, list]: """ Get data from storage unit according to given fields and local_indexes. @@ -115,11 +112,8 @@ def put_data(self, field_data: dict[str, Any], local_indexes: list[int]) -> None """ for f, values in field_data.items(): - # Double-checked locking for field initialization if f not in self.field_data: - with self._lock: - if f not in self.field_data: - self.field_data[f] = [None] * self.storage_size + self.field_data[f] = [None] * self.storage_size for i, idx in enumerate(local_indexes): if idx < 0 or idx >= self.storage_size: @@ -146,10 +140,9 @@ def clear(self, local_indexes: list[int]) -> None: ) # Clear data at specified local_indexes - with self._lock: - for f in self.field_data: - for idx in local_indexes: - self.field_data[f][idx] = None + for f in self.field_data: + for idx in local_indexes: + self.field_data[f][idx] = None @ray.remote(num_cpus=1) @@ -170,21 +163,17 @@ class SimpleStorageUnit: zmq_server_info: ZMQ connection information for clients. """ - def __init__(self, storage_unit_size: int, num_worker_threads: int = 1): + def __init__(self, storage_unit_size: int): """Initialize a SimpleStorageUnit with the specified size. Args: storage_unit_size: Maximum number of elements that can be stored in this storage unit. - num_worker_threads: Number of worker threads for handling requests. """ self.storage_unit_id = f"TQ_STORAGE_UNIT_{uuid4().hex[:8]}" self.storage_unit_size = storage_unit_size self.storage_data = StorageUnitData(self.storage_unit_size) - # Number of worker threads for handling requests - self.num_workers = num_worker_threads - # Internal communication address for proxy and workers self._inproc_addr = f"inproc://simple_storage_workers_{self.storage_unit_id}" @@ -194,7 +183,7 @@ def __init__(self, storage_unit_size: int, num_worker_threads: int = 1): # Placeholder for zmq_context, proxy_thread and worker_threads self.zmq_context: Optional[zmq.Context] = None self.proxy_thread: Optional[Thread] = None - self.worker_threads: list[Thread] = [] + self.worker_thread: Optional[Thread] = None self._init_zmq_socket() self._start_process_put_get() @@ -204,7 +193,7 @@ def __init__(self, storage_unit_size: int, num_worker_threads: int = 1): self, self._shutdown_resources, self._shutdown_event, - self.worker_threads, + self.worker_thread, self.proxy_thread, self.zmq_context, ) @@ -244,16 +233,13 @@ def _init_zmq_socket(self) -> None: def _start_process_put_get(self) -> None: """Start worker threads and ZMQ proxy for handling requests.""" - # Start worker threads - for i in range(self.num_workers): - worker_thread = Thread( - target=self._worker_routine, - args=(i,), - name=f"StorageUnitWorkerThread-{self.storage_unit_id}-{i}", - daemon=True, - ) - worker_thread.start() - self.worker_threads.append(worker_thread) + # Start worker thread + self.worker_thread = Thread( + target=self._worker_routine, + name=f"StorageUnitWorkerThread-{self.storage_unit_id}", + daemon=True, + ) + self.worker_thread.start() # Start proxy thread (ROUTER <-> DEALER) self.proxy_thread = Thread( @@ -276,7 +262,7 @@ def _proxy_routine(self) -> None: else: logger.error(f"[{self.storage_unit_id}]: ZMQ Proxy unexpected error: {e}") - def _worker_routine(self, worker_id: int) -> None: + def _worker_routine(self) -> None: """Worker thread for processing requests.""" # Each worker must have its own socket worker_socket = create_zmq_socket(self.zmq_context, zmq.DEALER) @@ -285,18 +271,18 @@ def _worker_routine(self, worker_id: int) -> None: poller = zmq.Poller() poller.register(worker_socket, zmq.POLLIN) - logger.info(f"[{self.storage_unit_id}]: worker {worker_id} started...") - perf_monitor = IntervalPerfMonitor(caller_name=f"{self.storage_unit_id}_worker_{worker_id}") + logger.info(f"[{self.storage_unit_id}]: worker thread started...") + perf_monitor = IntervalPerfMonitor(caller_name=f"{self.storage_unit_id}") while not self._shutdown_event.is_set(): try: socks = dict(poller.poll(TQ_STORAGE_POLLER_TIMEOUT * 1000)) except zmq.error.ContextTerminated: # ZMQ context was terminated, exit gracefully - logger.info(f"[{self.storage_unit_id}]: worker {worker_id} stopped gracefully (Context Terminated)") + logger.info(f"[{self.storage_unit_id}]: worker stopped gracefully (Context Terminated)") break except Exception as e: - logger.warning(f"[{self.storage_unit_id}]: worker {worker_id} poll error: {e}") + logger.warning(f"[{self.storage_unit_id}]: worker poll error: {e}") continue if self._shutdown_event.is_set(): @@ -312,7 +298,7 @@ def _worker_routine(self, worker_id: int) -> None: operation = request_msg.request_type try: - logger.debug(f"[{self.storage_unit_id}]: worker {worker_id} received operation: {operation}") + logger.debug(f"[{self.storage_unit_id}]: worker received operation: {operation}") # Process request if operation == ZMQRequestType.PUT_DATA: @@ -338,7 +324,7 @@ def _worker_routine(self, worker_id: int) -> None: request_type=ZMQRequestType.PUT_GET_ERROR, sender_id=self.storage_unit_id, body={ - "message": f"{self.storage_unit_id}, worker {worker_id} encountered error " + "message": f"{self.storage_unit_id}, worker encountered error " f"during operation {operation}: {str(e)}." }, ) @@ -346,7 +332,7 @@ def _worker_routine(self, worker_id: int) -> None: # Send response back with identity for routing worker_socket.send_multipart([identity] + response_msg.serialize(), copy=False) - logger.info(f"[{self.storage_unit_id}]: worker {worker_id} stopped.") + logger.info(f"[{self.storage_unit_id}]: worker stopped.") worker_socket.close(linger=0) def _handle_put(self, data_parts: ZMQMessage) -> ZMQMessage: @@ -457,7 +443,7 @@ def _handle_clear(self, data_parts: ZMQMessage) -> ZMQMessage: @staticmethod def _shutdown_resources( shutdown_event: Event, - worker_threads: list[Thread], + worker_thread: Optional[Thread], proxy_thread: Optional[Thread], zmq_context: Optional[zmq.Context], ) -> None: @@ -472,9 +458,8 @@ def _shutdown_resources( zmq_context.term() # Wait for threads to finish (with timeout) - for thread in worker_threads: - if thread and thread.is_alive(): - thread.join(timeout=5) + if worker_thread and worker_thread.is_alive(): + worker_thread.join(timeout=5) if proxy_thread and proxy_thread.is_alive(): proxy_thread.join(timeout=5) From f6a16f45a736cb9cf66980274a99896dc27faf1f Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Fri, 27 Feb 2026 10:10:11 +0800 Subject: [PATCH 7/9] fix minor issues Signed-off-by: 0oshowero0 --- transfer_queue/storage/simple_backend.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/transfer_queue/storage/simple_backend.py b/transfer_queue/storage/simple_backend.py index a6cdcfb..a64856b 100644 --- a/transfer_queue/storage/simple_backend.py +++ b/transfer_queue/storage/simple_backend.py @@ -333,6 +333,7 @@ def _worker_routine(self) -> None: worker_socket.send_multipart([identity] + response_msg.serialize(), copy=False) logger.info(f"[{self.storage_unit_id}]: worker stopped.") + poller.unregister(worker_socket) worker_socket.close(linger=0) def _handle_put(self, data_parts: ZMQMessage) -> ZMQMessage: @@ -453,16 +454,16 @@ def _shutdown_resources( # Signal all threads to stop shutdown_event.set() - # Terminate ZMQ context to unblock proxy and workers - if zmq_context: - zmq_context.term() - # Wait for threads to finish (with timeout) if worker_thread and worker_thread.is_alive(): worker_thread.join(timeout=5) if proxy_thread and proxy_thread.is_alive(): proxy_thread.join(timeout=5) + # Terminate ZMQ context to unblock proxy and workers + if zmq_context: + zmq_context.term() + logger.info("SimpleStorageUnit resources shutdown complete.") def get_zmq_server_info(self) -> ZMQServerInfo: From 5c20b4ede0e7e5284d48c4b6b7334cbb4a08ca66 Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Fri, 27 Feb 2026 15:24:28 +0800 Subject: [PATCH 8/9] fix comments Signed-off-by: 0oshowero0 --- transfer_queue/storage/simple_backend.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/transfer_queue/storage/simple_backend.py b/transfer_queue/storage/simple_backend.py index a64856b..116c8be 100644 --- a/transfer_queue/storage/simple_backend.py +++ b/transfer_queue/storage/simple_backend.py @@ -16,6 +16,7 @@ import dataclasses import logging import os +import time import weakref from dataclasses import dataclass from operator import itemgetter @@ -241,6 +242,8 @@ def _start_process_put_get(self) -> None: ) self.worker_thread.start() + time.sleep(0.5) # make sure worker thread is ready before zmq.proxy forwarding messages + # Start proxy thread (ROUTER <-> DEALER) self.proxy_thread = Thread( target=self._proxy_routine, @@ -454,16 +457,16 @@ def _shutdown_resources( # Signal all threads to stop shutdown_event.set() + # Terminate ZMQ context to unblock proxy and workers + if zmq_context: + zmq_context.term() + # Wait for threads to finish (with timeout) if worker_thread and worker_thread.is_alive(): worker_thread.join(timeout=5) if proxy_thread and proxy_thread.is_alive(): proxy_thread.join(timeout=5) - # Terminate ZMQ context to unblock proxy and workers - if zmq_context: - zmq_context.term() - logger.info("SimpleStorageUnit resources shutdown complete.") def get_zmq_server_info(self) -> ZMQServerInfo: From f3d28a9d93c50984b9cfc534278935416d295f66 Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Fri, 27 Feb 2026 15:34:46 +0800 Subject: [PATCH 9/9] fix minor bug Signed-off-by: 0oshowero0 --- transfer_queue/storage/simple_backend.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/transfer_queue/storage/simple_backend.py b/transfer_queue/storage/simple_backend.py index 116c8be..ed12d54 100644 --- a/transfer_queue/storage/simple_backend.py +++ b/transfer_queue/storage/simple_backend.py @@ -183,6 +183,7 @@ def __init__(self, storage_unit_size: int): # Placeholder for zmq_context, proxy_thread and worker_threads self.zmq_context: Optional[zmq.Context] = None + self.put_get_socket: Optional[zmq.Socket] = None self.proxy_thread: Optional[Thread] = None self.worker_thread: Optional[Thread] = None @@ -197,6 +198,7 @@ def __init__(self, storage_unit_size: int): self.worker_thread, self.proxy_thread, self.zmq_context, + self.put_get_socket, ) def _init_zmq_socket(self) -> None: @@ -450,6 +452,7 @@ def _shutdown_resources( worker_thread: Optional[Thread], proxy_thread: Optional[Thread], zmq_context: Optional[zmq.Context], + put_get_socket: Optional[zmq.Socket], ) -> None: """Clean up resources on garbage collection.""" logger.info("Shutting down SimpleStorageUnit resources...") @@ -457,6 +460,10 @@ def _shutdown_resources( # Signal all threads to stop shutdown_event.set() + # Terminate put_get_socket + if put_get_socket: + put_get_socket.close(linger=0) + # Terminate ZMQ context to unblock proxy and workers if zmq_context: zmq_context.term()