Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 30 additions & 65 deletions OWNd/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,82 +684,47 @@ async def connect_to_gateway(cls, gateway: OWNGateway):
connection = cls(gateway)
await connection.connect()

async def send(self, message: str, is_status_request: bool = False):
async def send(self, message, is_status_request: bool = False, trial_number: int = 1):
"""Send the attached message on an existing 'command' connection,
actively reconnecting it if it had been reset."""

try:

self._stream_writer.write(str(message).encode())
await self._stream_writer.drain()
raw_response = await self._stream_reader.readuntil(OWNSession.SEPARATOR)
resulting_message = OWNMessage.parse(raw_response.decode())
if (
isinstance(resulting_message, OWNSignaling)
and resulting_message.is_nack()
):
self._stream_writer.write(str(message).encode())
await self._stream_writer.drain()

while True:
raw_response = await self._stream_reader.readuntil(OWNSession.SEPARATOR)
resulting_message = OWNSignaling(raw_response.decode())
if resulting_message.is_nack():
self._logger.error(
"%s Could not send message `%s`.", self._gateway.log_id, message
)
elif resulting_message.is_ack():
if not is_status_request:
self._logger.info(
"%s Message `%s` was successfully sent.",
self._gateway.log_id,
message,
)
else:
self._logger.debug(
"%s Message `%s` was successfully sent.",
self._gateway.log_id,
message,
)
elif (
isinstance(resulting_message, OWNSignaling)
and resulting_message.is_ack()
):
if not is_status_request:
self._logger.info(
"%s Message `%s` was successfully sent.",
self._gateway.log_id,
message,
)
resulting_message = OWNMessage.parse(raw_response.decode())

if isinstance(resulting_message, OWNSignaling):
# We finally got ACK or NACK
if resulting_message.is_nack():
if trial_number > 2:
self._logger.error(
"%s Could not send message `%s`. No more retries.", self._gateway.log_id, message
)
break
else:
self._logger.error(
"%s Could not send message `%s`. Retrying (%d)...", self._gateway.log_id, message,
trial_number
)
return await self.send(message, is_status_request, trial_number + 1)
elif resulting_message.is_ack():
log_message = "%s Message `%s` was successfully sent."
if not is_status_request:
self._logger.info(log_message, self._gateway.log_id, message)
else:
self._logger.debug(log_message, self._gateway.log_id, message)
break
else:
# We've encountered some different reply. Typical for heating queries which follow with burst of messages
self._logger.debug(
"%s Message `%s` was successfully sent.",
"%s Got non-signaling message `%s`.",
self._gateway.log_id,
message,
)
else:
self._logger.debug(
"%s Message `%s` received response `%s`.",
self._gateway.log_id,
message,
resulting_message,
)
raw_response = await self._stream_reader.readuntil(OWNSession.SEPARATOR)
resulting_message = OWNSignaling(raw_response.decode())
if resulting_message.is_nack():
self._logger.error(
"%s Could not send message `%s`.", self._gateway.log_id, message
resulting_message,
)
elif resulting_message.is_ack():
if not is_status_request:
self._logger.info(
"%s Message `%s` was successfully sent.",
self._gateway.log_id,
message,
)
else:
self._logger.debug(
"%s Message `%s` was successfully sent.",
self._gateway.log_id,
message,
)

except (ConnectionResetError, asyncio.IncompleteReadError):
self._logger.debug(
Expand Down