diff --git a/bluez_peripheral/advert.py b/bluez_peripheral/advert.py index 701f601..02f307d 100644 --- a/bluez_peripheral/advert.py +++ b/bluez_peripheral/advert.py @@ -1,18 +1,47 @@ -from typing import Collection, Dict, Optional, Union, Any import struct +from typing import Any, Collection, Dict, Optional, Union from dbus_fast import Variant -from dbus_fast.constants import PropertyAccess -from dbus_fast.service import method, dbus_property from dbus_fast.aio.message_bus import MessageBus +from dbus_fast.constants import PropertyAccess +from dbus_fast.service import dbus_property, method -from .uuid16 import UUID16, UUIDLike -from .util import _snake_to_kebab from .adapter import Adapter -from .flags import AdvertisingIncludes -from .flags import AdvertisingPacketType from .base import BaseServiceInterface, UniquePathMixin from .error import bluez_error_wrapper +from .flags import AdvertisingIncludes, AdvertisingPacketType +from .util import _snake_to_kebab +from .uuid16 import UUID16, UUIDLike + +# bluez src/advertising.c parse_min_interval / parse_max_interval: HCI slot = ms / 0.625, +# valid slots 0x20 .. 0xFFFFFF (see doc/org.bluez.LEAdvertisement.rst). +_ADV_INTERVAL_SLOT_MIN = 0x20 +_ADV_INTERVAL_SLOT_MAX = 0xFFFFFF + + +def _adv_interval_ms_to_slot(ms: int) -> int: + """Convert advertising interval from milliseconds to HCI units (matches bluez C division).""" + return int(ms / 0.625) + + +def _validate_advertising_intervals_ms(min_ms: int, max_ms: int) -> None: + min_slot = _adv_interval_ms_to_slot(min_ms) + max_slot = _adv_interval_ms_to_slot(max_ms) + if min_slot < _ADV_INTERVAL_SLOT_MIN or min_slot > _ADV_INTERVAL_SLOT_MAX: + raise ValueError( + "min_advertising_interval_ms is out of range for bluez LE advertising " + f"(HCI slot {min_slot} not in [{_ADV_INTERVAL_SLOT_MIN:#x}, {_ADV_INTERVAL_SLOT_MAX:#x}])" + ) + if max_slot < _ADV_INTERVAL_SLOT_MIN or max_slot > _ADV_INTERVAL_SLOT_MAX: + raise ValueError( + "max_advertising_interval_ms is out of range for bluez LE advertising " + f"(HCI slot {max_slot} not in [{_ADV_INTERVAL_SLOT_MIN:#x}, {_ADV_INTERVAL_SLOT_MAX:#x}])" + ) + if min_slot > max_slot: + raise ValueError( + "min_advertising_interval_ms must be <= max_advertising_interval_ms " + f"(HCI slots {min_slot} > {max_slot})" + ) class Advertisement(UniquePathMixin): @@ -34,6 +63,9 @@ class Advertisement(UniquePathMixin): includes: Fields that can be optionally included in the advertising packet. Only the :class:`bluez_peripheral.flags.AdvertisingIncludes.TX_POWER` flag seems to work correctly with bluez. duration: Duration of the advert when multiple adverts are ongoing. + min_advertising_interval_ms: Optional minimum advertising interval in milliseconds; must be set together with max. + max_advertising_interval_ms: Optional maximum advertising interval in milliseconds; must be set together with min. + See ``MinInterval`` / ``MaxInterval`` in the bluez LEAdvertisement documentation. """ _DEFAULT_PATH_PREFIX = "/com/spacecheese/bluez_peripheral/advert" @@ -140,6 +172,22 @@ def _get_includes(self) -> "as": # type: ignore def _get_duration(self) -> "q": # type: ignore return advert._duration + @dbus_property( + PropertyAccess.READ, + "MinInterval", + disabled=(advert._min_advertising_interval_ms is None), + ) + def _get_min_interval(self) -> "u": # type: ignore + return advert._min_advertising_interval_ms + + @dbus_property( + PropertyAccess.READ, + "MaxInterval", + disabled=(advert._max_advertising_interval_ms is None), + ) + def _get_max_interval(self) -> "u": # type: ignore + return advert._max_advertising_interval_ms + return _AdvertService() def __init__( @@ -156,6 +204,8 @@ def __init__( service_data: Optional[Dict[UUIDLike, bytes]] = None, includes: AdvertisingIncludes = AdvertisingIncludes.NONE, duration: Optional[int] = 2, + min_advertising_interval_ms: Optional[int] = None, + max_advertising_interval_ms: Optional[int] = None, ): self._type = packet_type # Convert any string uuids to uuid16. @@ -190,6 +240,24 @@ def __init__( self._includes = includes self._duration = duration + self._min_advertising_interval_ms: Optional[int] = None + self._max_advertising_interval_ms: Optional[int] = None + if (min_advertising_interval_ms is not None) ^ ( + max_advertising_interval_ms is not None + ): + raise ValueError( + "min_advertising_interval_ms and max_advertising_interval_ms must " + "both be set or both be omitted" + ) + if min_advertising_interval_ms is not None: + assert max_advertising_interval_ms is not None + _validate_advertising_intervals_ms( + min_advertising_interval_ms, + max_advertising_interval_ms, + ) + self._min_advertising_interval_ms = min_advertising_interval_ms + self._max_advertising_interval_ms = max_advertising_interval_ms + self._adapter: Optional[Adapter] = None self._service = self._advert_service_factory() diff --git a/tests/unit/test_advert.py b/tests/unit/test_advert.py index ad8dd68..0cdaaeb 100644 --- a/tests/unit/test_advert.py +++ b/tests/unit/test_advert.py @@ -7,6 +7,47 @@ from bluez_peripheral.flags import AdvertisingPacketType +def test_intervals_validation_partial_kwarg() -> None: + with pytest.raises(ValueError, match="both be set"): + Advertisement( + "x", + [], + min_advertising_interval_ms=100, + ) + + +def test_intervals_validation_min_gt_max() -> None: + with pytest.raises(ValueError, match="<="): + Advertisement( + "x", + [], + min_advertising_interval_ms=200, + max_advertising_interval_ms=100, + ) + + +def test_intervals_validation_slot_too_small() -> None: + # Min slot 0x20 <=> 20 ms; 19 ms is the largest integer ms with int(ms/0.625) < 0x20. + with pytest.raises(ValueError, match="out of range"): + Advertisement( + "x", + [], + min_advertising_interval_ms=20 - 1, + max_advertising_interval_ms=20, + ) + + +def test_intervals_validation_slot_too_large() -> None: + # Max slot 0xFFFFFF; 10485759 is last integer max_ms still in range, 10485760 is first over. + with pytest.raises(ValueError, match="out of range"): + Advertisement( + "x", + [], + min_advertising_interval_ms=20, + max_advertising_interval_ms=10485759 + 1, + ) + + @pytest.fixture def bus_name(): return "com.spacecheese.test" @@ -162,6 +203,54 @@ async def test_args_service_data(message_bus, bus_name, bus_path, background_adv } +@pytest.mark.asyncio +async def test_intervals_optional_omitted( + bus_name, bus_path, message_bus, background_advert +): + advert = Advertisement( + "Testing Device Name", + ["180A", "180D"], + appearance=0x0340, + timeout=2, + packet_type=AdvertisingPacketType.PERIPHERAL, + ) + background_advert(advert, path=bus_path) + + introspection = await message_bus.introspect(bus_name, bus_path) + le_iface = next( + i for i in introspection.interfaces if i.name == "org.bluez.LEAdvertisement1" + ) + prop_names = {p.name for p in le_iface.properties} + assert "MinInterval" not in prop_names + assert "MaxInterval" not in prop_names + + +@pytest.mark.asyncio +async def test_intervals_exposed(bus_name, bus_path, message_bus, background_advert): + advert = Advertisement( + "Testing Device Name", + ["180A", "180D"], + appearance=0x0340, + timeout=2, + packet_type=AdvertisingPacketType.PERIPHERAL, + min_advertising_interval_ms=100, + max_advertising_interval_ms=150, + ) + background_advert(advert, path=bus_path) + + introspection = await message_bus.introspect(bus_name, bus_path) + le_iface = next( + i for i in introspection.interfaces if i.name == "org.bluez.LEAdvertisement1" + ) + prop_names = {p.name for p in le_iface.properties} + assert "MinInterval" in prop_names + assert "MaxInterval" in prop_names + proxy_object = message_bus.get_proxy_object(bus_name, bus_path, introspection) + interface = proxy_object.get_interface("org.bluez.LEAdvertisement1") + assert await interface.get_min_interval() == 100 + assert await interface.get_max_interval() == 150 + + @pytest.mark.asyncio async def test_default_path(message_bus, bus_name, background_advert): adverts = [