Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion pybela/Streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,47 @@ async def _async_stop_streaming(self, variables=[]):
if not _previous_streaming_mode == "PEEK":
_print_info(f"Stopped monitoring variables {variables}...")

self._processed_data_msg_queue = asyncio.Queue() # clear processed data queue
# collect any remaining flushed buffers before clearing the queue
# this assures users don't lose final data chunks when stopping streaming
try:
remaining_messages = []
# safely drain the queue without blocking
while not self._processed_data_msg_queue.empty():
try:
msg = self._processed_data_msg_queue.get_nowait()
remaining_messages.append(msg)
self._processed_data_msg_queue.task_done()
except asyncio.QueueEmpty:
# queue became empty during iteration, which is fine
break
except Exception:
# any other error, stop processing but don't fail
break

# process remaining messages by adding them to streaming buffers
# this preserves the data for user access via streaming_buffers_queue
for msg in remaining_messages:
try:
var_name = msg.get("name")
buffer_data = msg.get("buffer")
# only process if we have valid data and the variable exists in our buffers
if (var_name and buffer_data is not None and
hasattr(self, 'streaming_buffers_queue') and
self.streaming_buffers_queue and
var_name in self.streaming_buffers_queue):
self.streaming_buffers_queue[var_name].append(buffer_data)
except Exception:
# if individual message processing fails, continue with others
# this ensures one bad message doesn't break the entire cleanup
continue

except Exception:
# if the entire buffer collection process fails, don't break stop_streaming
# fall back to original behavior (queue clearing without collection)
pass

# always clear the queue, regardless of collection success/failure
self._processed_data_msg_queue = asyncio.Queue()
self._on_buffer_callback_is_active = False
if self._on_buffer_callback_worker_task:
self._on_buffer_callback_worker_task.cancel()
Expand Down