From 4f08dc81facbb12b4baf7ca135b7864a52cafa86 Mon Sep 17 00:00:00 2001 From: evgeny Date: Wed, 27 Aug 2025 13:14:33 +0100 Subject: [PATCH] feat: add async and sync assert waiter utilities and update tests to remove temporary delays --- ably/scripts/unasync.py | 1 + test/ably/rest/restchannelpublish_test.py | 44 +++++++++-------- test/ably/utils.py | 57 ++++++++++++++++++++++- 3 files changed, 81 insertions(+), 21 deletions(-) diff --git a/ably/scripts/unasync.py b/ably/scripts/unasync.py index ed148742..72126f41 100644 --- a/ably/scripts/unasync.py +++ b/ably/scripts/unasync.py @@ -239,6 +239,7 @@ def run(): _TOKEN_REPLACE["AsyncClient"] = "Client" _TOKEN_REPLACE["aclose"] = "close" + _TOKEN_REPLACE["assert_waiter"] = "assert_waiter_sync" _IMPORTS_REPLACE["ably"] = "ably.sync" diff --git a/test/ably/rest/restchannelpublish_test.py b/test/ably/rest/restchannelpublish_test.py index a6099cb2..4abb7381 100644 --- a/test/ably/rest/restchannelpublish_test.py +++ b/test/ably/rest/restchannelpublish_test.py @@ -9,7 +9,6 @@ import mock import msgpack import pytest -import asyncio from ably import api_version from ably import AblyException, IncompatibleClientIdException @@ -20,7 +19,7 @@ from test.ably import utils from test.ably.testapp import TestApp -from test.ably.utils import VaryByProtocolTestsMetaclass, dont_vary_protocol, BaseAsyncTestCase +from test.ably.utils import VaryByProtocolTestsMetaclass, dont_vary_protocol, BaseAsyncTestCase, assert_waiter log = logging.getLogger(__name__) @@ -402,26 +401,31 @@ async def test_interoperability(self): expected_value = input_msg.get('expectedValue') # 1) - await channel.publish(data=expected_value) - # temporary added delay, we need to investigate why messages don't appear immediately - await asyncio.sleep(1) - async with httpx.AsyncClient(http2=True) as client: - r = await client.get(url, auth=auth) - item = r.json()[0] - assert item.get('encoding') == encoding - if encoding == 'json': - assert json.loads(item['data']) == json.loads(msg_data) - else: - assert item['data'] == msg_data + response = await channel.publish(data=expected_value) + assert response.status_code == 201 + + async def check_data(): + async with httpx.AsyncClient(http2=True) as client: + r = await client.get(url, auth=auth) + item = r.json()[0] + encoding_is_correct = item.get('encoding') == encoding + if encoding == 'json': + return encoding_is_correct and json.loads(item['data']) == json.loads(msg_data) + else: + return encoding_is_correct and item['data'] == msg_data + + await assert_waiter(check_data) # 2) - await channel.publish(messages=[Message(data=msg_data, encoding=encoding)]) - # temporary added delay, we need to investigate why messages don't appear immediately - await asyncio.sleep(1) - history = await channel.history() - message = history.items[0] - assert message.data == expected_value - assert type(message.data) == type_mapping[expected_type] + response = await channel.publish(messages=[Message(data=msg_data, encoding=encoding)]) + assert response.status_code == 201 + + async def check_history(): + history = await channel.history() + message = history.items[0] + return message.data == expected_value and type(message.data) == type_mapping[expected_type] + + await assert_waiter(check_history) # https://github.com/ably/ably-python/issues/130 async def test_publish_slash(self): diff --git a/test/ably/utils.py b/test/ably/utils.py index 0edddb90..51b07aab 100644 --- a/test/ably/utils.py +++ b/test/ably/utils.py @@ -1,9 +1,12 @@ +import asyncio import functools import os import random import string -import unittest import sys +import time +import unittest +from typing import Callable, Awaitable if sys.version_info >= (3, 8): from unittest import IsolatedAsyncioTestCase @@ -178,3 +181,55 @@ def get_submodule_dir(filepath): if os.path.exists(os.path.join(root_dir, 'submodules')): return os.path.join(root_dir, 'submodules') root_dir = os.path.dirname(root_dir) + + +async def assert_waiter(block: Callable[[], Awaitable[bool]], timeout: float = 10) -> None: + """ + Polls a condition until it succeeds or times out. + Args: + block: A callable that returns a boolean indicating success + timeout: Maximum time to wait in seconds (default: 10) + Raises: + TimeoutError: If condition not met within timeout + """ + try: + await asyncio.wait_for(_poll_until_success(block), timeout=timeout) + except asyncio.TimeoutError: + raise asyncio.TimeoutError(f"Condition not met within {timeout}s") + + +async def _poll_until_success(block: Callable[[], Awaitable[bool]]) -> None: + while True: + try: + success = await block() + if success: + break + except Exception: + pass + + await asyncio.sleep(0.1) + + +def assert_waiter_sync(block: Callable[[], bool], timeout: float = 10) -> None: + """ + Blocking version of assert_waiter that polls a condition until it succeeds or times out. + Args: + block: A callable that returns a boolean indicating success + timeout: Maximum time to wait in seconds (default: 10) + Raises: + TimeoutError: If condition not met within timeout + """ + start_time = time.time() + + while True: + try: + success = block() + if success: + break + except Exception: + pass + + if time.time() - start_time >= timeout: + raise TimeoutError(f"Condition not met within {timeout}s") + + time.sleep(0.1)