Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ably/scripts/unasync.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ def run():

_TOKEN_REPLACE["AsyncClient"] = "Client"
_TOKEN_REPLACE["aclose"] = "close"
_TOKEN_REPLACE["assert_waiter"] = "assert_waiter_sync"

_IMPORTS_REPLACE["ably"] = "ably.sync"

Expand Down
44 changes: 24 additions & 20 deletions test/ably/rest/restchannelpublish_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import mock
import msgpack
import pytest
import asyncio

from ably import api_version
from ably import AblyException, IncompatibleClientIdException
Expand All @@ -20,7 +19,7 @@
from test.ably import utils

from test.ably.testapp import TestApp
from test.ably.utils import VaryByProtocolTestsMetaclass, dont_vary_protocol, BaseAsyncTestCase
from test.ably.utils import VaryByProtocolTestsMetaclass, dont_vary_protocol, BaseAsyncTestCase, assert_waiter

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -402,26 +401,31 @@ async def test_interoperability(self):
expected_value = input_msg.get('expectedValue')

# 1)
await channel.publish(data=expected_value)
# temporary added delay, we need to investigate why messages don't appear immediately
await asyncio.sleep(1)
async with httpx.AsyncClient(http2=True) as client:
r = await client.get(url, auth=auth)
item = r.json()[0]
assert item.get('encoding') == encoding
if encoding == 'json':
assert json.loads(item['data']) == json.loads(msg_data)
else:
assert item['data'] == msg_data
response = await channel.publish(data=expected_value)
assert response.status_code == 201

async def check_data():
async with httpx.AsyncClient(http2=True) as client:
r = await client.get(url, auth=auth)
item = r.json()[0]
encoding_is_correct = item.get('encoding') == encoding
if encoding == 'json':
return encoding_is_correct and json.loads(item['data']) == json.loads(msg_data)
else:
return encoding_is_correct and item['data'] == msg_data

await assert_waiter(check_data)

# 2)
await channel.publish(messages=[Message(data=msg_data, encoding=encoding)])
# temporary added delay, we need to investigate why messages don't appear immediately
await asyncio.sleep(1)
history = await channel.history()
message = history.items[0]
assert message.data == expected_value
assert type(message.data) == type_mapping[expected_type]
response = await channel.publish(messages=[Message(data=msg_data, encoding=encoding)])
assert response.status_code == 201

async def check_history():
history = await channel.history()
message = history.items[0]
return message.data == expected_value and type(message.data) == type_mapping[expected_type]

await assert_waiter(check_history)

# https://github.com/ably/ably-python/issues/130
async def test_publish_slash(self):
Expand Down
57 changes: 56 additions & 1 deletion test/ably/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import asyncio
import functools
import os
import random
import string
import unittest
import sys
import time
import unittest
from typing import Callable, Awaitable

if sys.version_info >= (3, 8):
from unittest import IsolatedAsyncioTestCase
Expand Down Expand Up @@ -178,3 +181,55 @@ def get_submodule_dir(filepath):
if os.path.exists(os.path.join(root_dir, 'submodules')):
return os.path.join(root_dir, 'submodules')
root_dir = os.path.dirname(root_dir)


async def assert_waiter(block: Callable[[], Awaitable[bool]], timeout: float = 10) -> None:
"""
Polls a condition until it succeeds or times out.
Args:
block: A callable that returns a boolean indicating success
timeout: Maximum time to wait in seconds (default: 10)
Raises:
TimeoutError: If condition not met within timeout
"""
try:
await asyncio.wait_for(_poll_until_success(block), timeout=timeout)
except asyncio.TimeoutError:
raise asyncio.TimeoutError(f"Condition not met within {timeout}s")


async def _poll_until_success(block: Callable[[], Awaitable[bool]]) -> None:
while True:
try:
success = await block()
if success:
break
except Exception:
pass

await asyncio.sleep(0.1)


def assert_waiter_sync(block: Callable[[], bool], timeout: float = 10) -> None:
"""
Blocking version of assert_waiter that polls a condition until it succeeds or times out.
Args:
block: A callable that returns a boolean indicating success
timeout: Maximum time to wait in seconds (default: 10)
Raises:
TimeoutError: If condition not met within timeout
"""
start_time = time.time()

while True:
try:
success = block()
if success:
break
except Exception:
pass

if time.time() - start_time >= timeout:
raise TimeoutError(f"Condition not met within {timeout}s")

time.sleep(0.1)