Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions custom_components/myhome/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,14 @@ async def async_unload_entry(hass, entry):
for platform in hass.data[DOMAIN][entry.data[CONF_MAC]][CONF_PLATFORMS].keys():
await hass.config_entries.async_forward_entry_unload(entry, platform)

hass.services.async_remove(DOMAIN, "sync_time")
hass.services.async_remove(DOMAIN, "send_message")
if hass.services.has_service(DOMAIN, "sync_time"):
hass.services.async_remove(DOMAIN, "sync_time")

if hass.services.has_service(DOMAIN, "send_message"):
hass.services.async_remove(DOMAIN, "send_message")

gateway_handler = hass.data[DOMAIN][entry.data[CONF_MAC]].pop(CONF_ENTITY)
del hass.data[DOMAIN][entry.data[CONF_MAC]]

return await gateway_handler.close_listener()
await gateway_handler.stop() # Stop all asyncio tasks
return True
41 changes: 29 additions & 12 deletions custom_components/myhome/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,6 @@ async def listening_loop(self):
await _event_session.close()
self.is_connected = False

LOGGER.debug("%s Destroying listening worker.", self.log_id)
self.listening_worker.cancel()

async def sending_loop(self, worker_id: int):
self._terminate_sender = False

Expand All @@ -378,24 +375,17 @@ async def sending_loop(self, worker_id: int):
while not self._terminate_sender:
task = await self.send_buffer.get()
LOGGER.debug(
"%s Message `%s` was successfully unqueued by worker %s.",
"%s Message `%s` was successfully unqueued by worker %s. Task %s",
self.name,
self.gateway.host,
task["message"],
worker_id,
task["message"],
)
await _command_session.send(message=task["message"], is_status_request=task["is_status_request"])
self.send_buffer.task_done()

await _command_session.close()

LOGGER.debug(
"%s Destroying sending worker %s",
self.log_id,
worker_id,
)
self.sending_workers[worker_id].cancel()

async def close_listener(self) -> bool:
LOGGER.info("%s Closing event listener", self.log_id)
self._terminate_sender = True
Expand All @@ -418,3 +408,30 @@ async def send_status_request(self, message: OWNCommand):
self.log_id,
message,
)

async def stop(self):
"""Properly stop all background asyncio workers."""
LOGGER.debug("%s Stopping gateway workers...", self.log_id)

self._terminate_listener = True
self._terminate_sender = True

if self.listening_worker:
LOGGER.debug("%s Cancelling listening worker...", self.log_id)
self.listening_worker.cancel()
try:
await self.listening_worker
LOGGER.debug("%s Listening worker stopped cleanly.", self.log_id)
except asyncio.CancelledError:
LOGGER.debug("%s Listening worker was cancelled.", self.log_id)

for idx, task in enumerate(self.sending_workers):
LOGGER.debug("%s Cancelling sending worker %d...", self.log_id, idx)
task.cancel()
try:
await task
LOGGER.debug("%s Sending worker %d stopped cleanly.", self.log_id, idx)
except asyncio.CancelledError:
LOGGER.debug("%s Sending worker %d was cancelled.", self.log_id, idx)

LOGGER.debug("%s All gateway workers stopped.", self.log_id)