From 3c4261c1e513d18c6d724b85d532f797d74efa32 Mon Sep 17 00:00:00 2001 From: Vlad Lisnyi Date: Tue, 3 Jun 2025 21:46:53 +0200 Subject: [PATCH] Improved closure/unload of listenning/sending workers --- custom_components/myhome/__init__.py | 10 +++++-- custom_components/myhome/gateway.py | 41 ++++++++++++++++++++-------- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/custom_components/myhome/__init__.py b/custom_components/myhome/__init__.py index e5bdbba..ddd985c 100644 --- a/custom_components/myhome/__init__.py +++ b/custom_components/myhome/__init__.py @@ -279,10 +279,14 @@ async def async_unload_entry(hass, entry): for platform in hass.data[DOMAIN][entry.data[CONF_MAC]][CONF_PLATFORMS].keys(): await hass.config_entries.async_forward_entry_unload(entry, platform) - hass.services.async_remove(DOMAIN, "sync_time") - hass.services.async_remove(DOMAIN, "send_message") + if hass.services.has_service(DOMAIN, "sync_time"): + hass.services.async_remove(DOMAIN, "sync_time") + + if hass.services.has_service(DOMAIN, "send_message"): + hass.services.async_remove(DOMAIN, "send_message") gateway_handler = hass.data[DOMAIN][entry.data[CONF_MAC]].pop(CONF_ENTITY) del hass.data[DOMAIN][entry.data[CONF_MAC]] - return await gateway_handler.close_listener() + await gateway_handler.stop() # Stop all asyncio tasks + return True diff --git a/custom_components/myhome/gateway.py b/custom_components/myhome/gateway.py index 821a20d..a53d531 100644 --- a/custom_components/myhome/gateway.py +++ b/custom_components/myhome/gateway.py @@ -360,9 +360,6 @@ async def listening_loop(self): await _event_session.close() self.is_connected = False - LOGGER.debug("%s Destroying listening worker.", self.log_id) - self.listening_worker.cancel() - async def sending_loop(self, worker_id: int): self._terminate_sender = False @@ -378,24 +375,17 @@ async def sending_loop(self, worker_id: int): while not self._terminate_sender: task = await self.send_buffer.get() LOGGER.debug( - "%s Message `%s` was successfully unqueued by worker %s.", + "%s Message `%s` was successfully unqueued by worker %s. Task %s", self.name, self.gateway.host, - task["message"], worker_id, + task["message"], ) await _command_session.send(message=task["message"], is_status_request=task["is_status_request"]) self.send_buffer.task_done() await _command_session.close() - LOGGER.debug( - "%s Destroying sending worker %s", - self.log_id, - worker_id, - ) - self.sending_workers[worker_id].cancel() - async def close_listener(self) -> bool: LOGGER.info("%s Closing event listener", self.log_id) self._terminate_sender = True @@ -418,3 +408,30 @@ async def send_status_request(self, message: OWNCommand): self.log_id, message, ) + + async def stop(self): + """Properly stop all background asyncio workers.""" + LOGGER.debug("%s Stopping gateway workers...", self.log_id) + + self._terminate_listener = True + self._terminate_sender = True + + if self.listening_worker: + LOGGER.debug("%s Cancelling listening worker...", self.log_id) + self.listening_worker.cancel() + try: + await self.listening_worker + LOGGER.debug("%s Listening worker stopped cleanly.", self.log_id) + except asyncio.CancelledError: + LOGGER.debug("%s Listening worker was cancelled.", self.log_id) + + for idx, task in enumerate(self.sending_workers): + LOGGER.debug("%s Cancelling sending worker %d...", self.log_id, idx) + task.cancel() + try: + await task + LOGGER.debug("%s Sending worker %d stopped cleanly.", self.log_id, idx) + except asyncio.CancelledError: + LOGGER.debug("%s Sending worker %d was cancelled.", self.log_id, idx) + + LOGGER.debug("%s All gateway workers stopped.", self.log_id)