From 100e0b88ed4d081ce6c2158d725a49bfd5208711 Mon Sep 17 00:00:00 2001 From: Klee Dienes Date: Sat, 29 Apr 2023 16:06:59 +0000 Subject: [PATCH] Convert _send_json to async --- entanglement/protocol/__init__.py | 14 +++++++++----- entanglement/websocket.py | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/entanglement/protocol/__init__.py b/entanglement/protocol/__init__.py index 41d8a63..bed951a 100644 --- a/entanglement/protocol/__init__.py +++ b/entanglement/protocol/__init__.py @@ -183,21 +183,25 @@ async def _run_sync(self): try: while True: elt = self.current_dirty.pop() - try:self._send_sync_message(elt) + try: + await self._send_sync_message(elt) except: logger.exception("Error sending {}".format(repr(elt.obj))) if self.waiter: await self.waiter except StopIteration: #empty set self.task = None - self._send_sync_message(None) #Send metadata only message if useful + await self._send_sync_message(None) #Send metadata only message if useful if self.drain_future: self.drain_future.set_result(True) self.drain_future = None self.current_dirty = self.dirty if len(self.dirty) > 0: self.task = self.loop.create_task(self._run_sync()) + except: + logger.exception(f'error in _run_sync') + raise - def _send_sync_message(self, elt): + async def _send_sync_message(self, elt): flags = 0 responses_to = None if elt and elt.response_for: @@ -217,7 +221,7 @@ def _send_sync_message(self, elt): sync_rep = {} new_flags = self._handle_meta_out(flags, sync_rep) if len(sync_rep) == 0: return - self._send_json(sync_rep, new_flags) + await self._send_json(sync_rep, new_flags) self._out_counter += 1 @@ -326,7 +330,7 @@ def __init__(self, manager, incoming = False, dest = None, **kwargs): self.reader = asyncio.StreamReader(loop = self.loop) self.reader_task = None - def _send_json(self, sync_rep, flags): + async def _send_json(self, sync_rep, flags): js = bytes(json.dumps(sync_rep), 'utf-8') protocol_logger.debug("#{c}: Sending `{js}' to {d} (flags {f})".format( js = js, d = self.dest, diff --git a/entanglement/websocket.py b/entanglement/websocket.py index 55b40f2..22690cb 100644 --- a/entanglement/websocket.py +++ b/entanglement/websocket.py @@ -102,7 +102,7 @@ def close(self): self.ws_handler.close() self.connection_lost(None) - def _send_json(self, sync_rep, flags): + async def _send_json(self, sync_rep, flags): sync_rep['_flags'] = int(flags) js = bytes(json.dumps(sync_rep), 'utf-8') protocol_logger.debug("#{c}: Sending `{js}' to {d} (flags {f})".format(