Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 75 additions & 7 deletions bluez_peripheral/advert.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,47 @@
from typing import Collection, Dict, Optional, Union, Any
import struct
from typing import Any, Collection, Dict, Optional, Union

from dbus_fast import Variant
from dbus_fast.constants import PropertyAccess
from dbus_fast.service import method, dbus_property
from dbus_fast.aio.message_bus import MessageBus
from dbus_fast.constants import PropertyAccess
from dbus_fast.service import dbus_property, method

from .uuid16 import UUID16, UUIDLike
from .util import _snake_to_kebab
from .adapter import Adapter
from .flags import AdvertisingIncludes
from .flags import AdvertisingPacketType
from .base import BaseServiceInterface, UniquePathMixin
from .error import bluez_error_wrapper
from .flags import AdvertisingIncludes, AdvertisingPacketType
from .util import _snake_to_kebab
from .uuid16 import UUID16, UUIDLike

# bluez src/advertising.c parse_min_interval / parse_max_interval: HCI slot = ms / 0.625,
# valid slots 0x20 .. 0xFFFFFF (see doc/org.bluez.LEAdvertisement.rst).
_ADV_INTERVAL_SLOT_MIN = 0x20
_ADV_INTERVAL_SLOT_MAX = 0xFFFFFF


def _adv_interval_ms_to_slot(ms: int) -> int:
"""Convert advertising interval from milliseconds to HCI units (matches bluez C division)."""
return int(ms / 0.625)


def _validate_advertising_intervals_ms(min_ms: int, max_ms: int) -> None:
min_slot = _adv_interval_ms_to_slot(min_ms)
max_slot = _adv_interval_ms_to_slot(max_ms)
if min_slot < _ADV_INTERVAL_SLOT_MIN or min_slot > _ADV_INTERVAL_SLOT_MAX:
raise ValueError(
"min_advertising_interval_ms is out of range for bluez LE advertising "
f"(HCI slot {min_slot} not in [{_ADV_INTERVAL_SLOT_MIN:#x}, {_ADV_INTERVAL_SLOT_MAX:#x}])"
)
if max_slot < _ADV_INTERVAL_SLOT_MIN or max_slot > _ADV_INTERVAL_SLOT_MAX:
raise ValueError(
"max_advertising_interval_ms is out of range for bluez LE advertising "
f"(HCI slot {max_slot} not in [{_ADV_INTERVAL_SLOT_MIN:#x}, {_ADV_INTERVAL_SLOT_MAX:#x}])"
)
if min_slot > max_slot:
raise ValueError(
"min_advertising_interval_ms must be <= max_advertising_interval_ms "
f"(HCI slots {min_slot} > {max_slot})"
)


class Advertisement(UniquePathMixin):
Expand All @@ -34,6 +63,9 @@ class Advertisement(UniquePathMixin):
includes: Fields that can be optionally included in the advertising packet.
Only the :class:`bluez_peripheral.flags.AdvertisingIncludes.TX_POWER` flag seems to work correctly with bluez.
duration: Duration of the advert when multiple adverts are ongoing.
min_advertising_interval_ms: Optional minimum advertising interval in milliseconds; must be set together with max.
max_advertising_interval_ms: Optional maximum advertising interval in milliseconds; must be set together with min.
See ``MinInterval`` / ``MaxInterval`` in the bluez LEAdvertisement documentation.
"""

_DEFAULT_PATH_PREFIX = "/com/spacecheese/bluez_peripheral/advert"
Expand Down Expand Up @@ -140,6 +172,22 @@ def _get_includes(self) -> "as": # type: ignore
def _get_duration(self) -> "q": # type: ignore
return advert._duration

@dbus_property(
PropertyAccess.READ,
"MinInterval",
disabled=(advert._min_advertising_interval_ms is None),
)
def _get_min_interval(self) -> "u": # type: ignore
return advert._min_advertising_interval_ms

@dbus_property(
PropertyAccess.READ,
"MaxInterval",
disabled=(advert._max_advertising_interval_ms is None),
)
def _get_max_interval(self) -> "u": # type: ignore
return advert._max_advertising_interval_ms

return _AdvertService()

def __init__(
Expand All @@ -156,6 +204,8 @@ def __init__(
service_data: Optional[Dict[UUIDLike, bytes]] = None,
includes: AdvertisingIncludes = AdvertisingIncludes.NONE,
duration: Optional[int] = 2,
min_advertising_interval_ms: Optional[int] = None,
max_advertising_interval_ms: Optional[int] = None,
):
self._type = packet_type
# Convert any string uuids to uuid16.
Expand Down Expand Up @@ -190,6 +240,24 @@ def __init__(
self._includes = includes
self._duration = duration

self._min_advertising_interval_ms: Optional[int] = None
self._max_advertising_interval_ms: Optional[int] = None
if (min_advertising_interval_ms is not None) ^ (
max_advertising_interval_ms is not None
):
raise ValueError(
"min_advertising_interval_ms and max_advertising_interval_ms must "
"both be set or both be omitted"
)
if min_advertising_interval_ms is not None:
assert max_advertising_interval_ms is not None
_validate_advertising_intervals_ms(
min_advertising_interval_ms,
max_advertising_interval_ms,
)
self._min_advertising_interval_ms = min_advertising_interval_ms
self._max_advertising_interval_ms = max_advertising_interval_ms

self._adapter: Optional[Adapter] = None
self._service = self._advert_service_factory()

Expand Down
89 changes: 89 additions & 0 deletions tests/unit/test_advert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,47 @@
from bluez_peripheral.flags import AdvertisingPacketType


def test_intervals_validation_partial_kwarg() -> None:
with pytest.raises(ValueError, match="both be set"):
Advertisement(
"x",
[],
min_advertising_interval_ms=100,
)


def test_intervals_validation_min_gt_max() -> None:
with pytest.raises(ValueError, match="<="):
Advertisement(
"x",
[],
min_advertising_interval_ms=200,
max_advertising_interval_ms=100,
)


def test_intervals_validation_slot_too_small() -> None:
# Min slot 0x20 <=> 20 ms; 19 ms is the largest integer ms with int(ms/0.625) < 0x20.
with pytest.raises(ValueError, match="out of range"):
Advertisement(
"x",
[],
min_advertising_interval_ms=20 - 1,
max_advertising_interval_ms=20,
)


def test_intervals_validation_slot_too_large() -> None:
# Max slot 0xFFFFFF; 10485759 is last integer max_ms still in range, 10485760 is first over.
with pytest.raises(ValueError, match="out of range"):
Advertisement(
"x",
[],
min_advertising_interval_ms=20,
max_advertising_interval_ms=10485759 + 1,
)


@pytest.fixture
def bus_name():
return "com.spacecheese.test"
Expand Down Expand Up @@ -162,6 +203,54 @@ async def test_args_service_data(message_bus, bus_name, bus_path, background_adv
}


@pytest.mark.asyncio
async def test_intervals_optional_omitted(
bus_name, bus_path, message_bus, background_advert
):
advert = Advertisement(
"Testing Device Name",
["180A", "180D"],
appearance=0x0340,
timeout=2,
packet_type=AdvertisingPacketType.PERIPHERAL,
)
background_advert(advert, path=bus_path)

introspection = await message_bus.introspect(bus_name, bus_path)
le_iface = next(
i for i in introspection.interfaces if i.name == "org.bluez.LEAdvertisement1"
)
prop_names = {p.name for p in le_iface.properties}
assert "MinInterval" not in prop_names
assert "MaxInterval" not in prop_names


@pytest.mark.asyncio
async def test_intervals_exposed(bus_name, bus_path, message_bus, background_advert):
advert = Advertisement(
"Testing Device Name",
["180A", "180D"],
appearance=0x0340,
timeout=2,
packet_type=AdvertisingPacketType.PERIPHERAL,
min_advertising_interval_ms=100,
max_advertising_interval_ms=150,
)
background_advert(advert, path=bus_path)

introspection = await message_bus.introspect(bus_name, bus_path)
le_iface = next(
i for i in introspection.interfaces if i.name == "org.bluez.LEAdvertisement1"
)
prop_names = {p.name for p in le_iface.properties}
assert "MinInterval" in prop_names
assert "MaxInterval" in prop_names
proxy_object = message_bus.get_proxy_object(bus_name, bus_path, introspection)
interface = proxy_object.get_interface("org.bluez.LEAdvertisement1")
assert await interface.get_min_interval() == 100
assert await interface.get_max_interval() == 150


@pytest.mark.asyncio
async def test_default_path(message_bus, bus_name, background_advert):
adverts = [
Expand Down
Loading