diff --git a/switchbot/__init__.py b/switchbot/__init__.py index 8511ebd9..f5981fb6 100644 --- a/switchbot/__init__.py +++ b/switchbot/__init__.py @@ -52,6 +52,7 @@ SwitchbotStripLight3, ) from .devices.lock import SwitchbotLock +from .devices.meter_pro import SwitchbotMeterProCO2 from .devices.plug import SwitchbotPlugMini from .devices.relay_switch import ( SwitchbotGarageDoorOpener, @@ -101,6 +102,7 @@ "SwitchbotKeypadVision", "SwitchbotLightStrip", "SwitchbotLock", + "SwitchbotMeterProCO2", "SwitchbotModel", "SwitchbotModel", "SwitchbotOperationError", diff --git a/switchbot/devices/meter_pro.py b/switchbot/devices/meter_pro.py new file mode 100644 index 00000000..4f4fbbda --- /dev/null +++ b/switchbot/devices/meter_pro.py @@ -0,0 +1,172 @@ +from typing import Any + +from ..helpers import parse_uint24_be +from .device import SwitchbotDevice, SwitchbotOperationError + +COMMAND_SET_TIME_OFFSET = "570f680506" +COMMAND_GET_TIME_OFFSET = "570f690506" +MAX_TIME_OFFSET = (1 << 24) - 1 + +COMMAND_GET_DEVICE_DATETIME = "570f6901" +COMMAND_SET_DEVICE_DATETIME = "57000503" +COMMAND_SET_DISPLAY_FORMAT = "570f680505" + + +class SwitchbotMeterProCO2(SwitchbotDevice): + """API to control Switchbot Meter Pro CO2.""" + + async def get_time_offset(self) -> int: + """ + Get the current display time offset from the device. + + Returns: + int: The time offset in seconds. Max 24 bits. + + """ + # Response Format: 5 bytes, where + # - byte 0: "01" (success) + # - byte 1: "00" (plus offset) or "80" (minus offset) + # - bytes 2-4: int24, number of seconds to offset. + # Example response: 01-80-00-10-00 -> subtract 4096 seconds. + result = await self._send_command(COMMAND_GET_TIME_OFFSET) + result = self._validate_result("get_time_offset", result, min_length=5) + + is_negative = bool(result[1] & 0b10000000) + offset = parse_uint24_be(result, 2) + return -offset if is_negative else offset + + async def set_time_offset(self, offset_seconds: int) -> None: + """ + Set the display time offset on the device. This is what happens when + you adjust display time in the Switchbot app. The displayed time is + calculated as the internal device time (usually comes from the factory + settings or set by the Switchbot app upon syncing) + offset. The offset + is provided in seconds and can be positive or negative. + + Args: + offset_seconds (int): 2^24 maximum, can be negative. + + """ + abs_offset = abs(offset_seconds) + if abs_offset > MAX_TIME_OFFSET: + raise SwitchbotOperationError( + f"{self.name}: Requested to set_time_offset of {offset_seconds} seconds, allowed +-{MAX_TIME_OFFSET} max." + ) + + sign_byte = "80" if offset_seconds < 0 else "00" + + # Example: 57-0f-68-05-06-80-00-10-00 -> subtract 4096 seconds. + payload = f"{COMMAND_SET_TIME_OFFSET}{sign_byte}{abs_offset:06x}" + result = await self._send_command(payload) + self._validate_result("set_time_offset", result) + + async def get_datetime(self) -> dict[str, Any]: + """ + Get the current device time and settings as it is displayed. Contains + a time offset, if any was applied (see set_time_offset). + Doesn't include the current time zone. + + Returns: + dict: Dictionary containing: + - 12h_mode (bool): True if 12h mode, False if 24h mode. + - year (int) + - month (int) + - day (int) + - hour (int) + - minute (int) + - second (int) + + """ + # Response Format: 13 bytes, where + # - byte 0: "01" (success) + # - bytes 1-4: temperature, ignored here. + # - byte 5: time display format: + # - "80" - 12h (am/pm) + # - "00" - 24h + # - bytes 6-12: yyyy-MM-dd-hh-mm-ss + # Example: 01-e4-02-94-23-00-07-e9-0c-1e-08-37-01 contains + # "year 2025, 30 December, 08:55:01, displayed in 24h format". + result = await self._send_command(COMMAND_GET_DEVICE_DATETIME) + result = self._validate_result("get_datetime", result, min_length=13) + return { + # Whether the time is displayed in 12h(am/pm) or 24h mode. + "12h_mode": bool(result[5] & 0b10000000), + "year": (result[6] << 8) + result[7], + "month": result[8], + "day": result[9], + "hour": result[10], + "minute": result[11], + "second": result[12], + } + + async def set_datetime( + self, timestamp: int, utc_offset_hours: int = 0, utc_offset_minutes: int = 0 + ) -> None: + """ + Set the device internal time and timezone. Similar to how the + Switchbot app does it upon syncing with the device. + Pay attention to calculating UTC offset hours and minutes, see + examples below. + + Args: + timestamp (int): Unix timestamp in seconds. + utc_offset_hours (int): UTC offset in hours, floor()'ed, + within [-12; 14] range. + Examples: -5 for UTC-05:00, -6 for UTC-05:30, + 5 for UTC+05:00, 5 for UTC+5:30. + utc_offset_minutes (int): UTC offset minutes component, always + positive, complements utc_offset_hours. + Examples: 45 for UTC+05:45, 15 for UTC-5:45. + + """ + if not (-12 <= utc_offset_hours <= 14): + raise SwitchbotOperationError( + f"{self.name}: utc_offset_hours must be between -12 and +14 inclusive, got {utc_offset_hours}" + ) + if not (0 <= utc_offset_minutes < 60): + raise SwitchbotOperationError( + f"{self.name}: utc_offset_minutes must be between 0 and 59 inclusive, got {utc_offset_minutes}" + ) + + # The device doesn't automatically add offset minutes, it expects them + # to come as a part of the timestamp. + adjusted_timestamp = timestamp + utc_offset_minutes * 60 + + # The timezone is encoded as 1 byte, where 00 stands for UTC-12. + # TZ with minute offset gets floor()ed: 4:30 yields 4, -4:30 yields -5. + utc_byte = utc_offset_hours + 12 + + payload = ( + f"{COMMAND_SET_DEVICE_DATETIME}{utc_byte:02x}" + f"{adjusted_timestamp:016x}{utc_offset_minutes:02x}" + ) + + result = await self._send_command(payload) + self._validate_result("set_datetime", result) + + async def set_time_display_format(self, is_12h_mode: bool = False) -> None: + """ + Set the time display format on the device: 12h(AM/PM) or 24h. + + Args: + is_12h_mode (bool): True for 12h (AM/PM) mode, False for 24h mode. + + """ + mode_byte = "80" if is_12h_mode else "00" + payload = f"{COMMAND_SET_DISPLAY_FORMAT}{mode_byte}" + result = await self._send_command(payload) + self._validate_result("set_time_display_format", result) + + def _validate_result( + self, op_name: str, result: bytes | None, min_length: int | None = None + ) -> bytes: + if not self._check_command_result(result, 0, {1}): + raise SwitchbotOperationError( + f"{self.name}: Unexpected response code for {op_name} (result={result.hex() if result else 'None'} rssi={self.rssi})" + ) + assert result is not None + if min_length is not None and len(result) < min_length: + raise SwitchbotOperationError( + f"{self.name}: Unexpected response len for {op_name}, wanted at least {min_length} (result={result.hex() if result else 'None'} rssi={self.rssi})" + ) + return result diff --git a/tests/test_meter_pro.py b/tests/test_meter_pro.py new file mode 100644 index 00000000..449a8636 --- /dev/null +++ b/tests/test_meter_pro.py @@ -0,0 +1,249 @@ +from unittest.mock import AsyncMock + +import pytest +from bleak.backends.device import BLEDevice + +from switchbot import SwitchbotOperationError +from switchbot.devices.meter_pro import MAX_TIME_OFFSET, SwitchbotMeterProCO2 + + +def create_device(): + ble_device = BLEDevice( + address="aa:bb:cc:dd:ee:ff", name="any", details={"rssi": -80} + ) + device = SwitchbotMeterProCO2(ble_device) + device._send_command = AsyncMock() + return device + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ( + "device_response", + "expected_offset", + ), + [ + ("0100101bc9", 1055689), # 01 (success) 00 (plus offset) 10 1b c9 (1055689) + ("0180101bc9", -1055689), # 01 (success) 80 (minus offset) 10 1b c9 (1055689) + ], +) +async def test_get_time_offset(device_response: str, expected_offset: int): + device = create_device() + device._send_command.return_value = bytes.fromhex(device_response) + + offset = await device.get_time_offset() + device._send_command.assert_called_with("570f690506") + assert offset == expected_offset + + +@pytest.mark.asyncio +async def test_get_time_offset_failure(): + device = create_device() + # Invalid 1st byte + device._send_command.return_value = bytes.fromhex("0080101bc9") + + with pytest.raises(SwitchbotOperationError): + await device.get_time_offset() + device._send_command.assert_called_with("570f690506") + + +@pytest.mark.asyncio +async def test_get_time_offset_wrong_response(): + device = create_device() + # Response too short (only status byte returned) + device._send_command.return_value = bytes.fromhex("01") + + with pytest.raises(SwitchbotOperationError): + await device.get_time_offset() + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ( + "offset_sec", + "expected_payload", + ), + [ + (1055689, "00101bc9"), # "00" for positive offset, 101bc9 for 1055689 + (-4096, "80001000"), # "80" for negative offset, 001000 for 4096 + (0, "00000000"), + (-0, "00000000"), # -0 == 0 in Python + ], +) +async def test_set_time_offset(offset_sec: int, expected_payload: str): + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + await device.set_time_offset(offset_sec) + device._send_command.assert_called_with("570f680506" + expected_payload) + + +@pytest.mark.asyncio +async def test_set_time_offset_too_large(): + device = create_device() + with pytest.raises(SwitchbotOperationError): + await device.set_time_offset(MAX_TIME_OFFSET + 1) + + with pytest.raises(SwitchbotOperationError): + await device.set_time_offset(-(MAX_TIME_OFFSET + 1)) + + +@pytest.mark.asyncio +async def test_set_time_offset_failure(): + device = create_device() + device._send_command.return_value = bytes.fromhex("00") + + with pytest.raises(SwitchbotOperationError): + await device.set_time_offset(100) + + +@pytest.mark.asyncio +async def test_get_datetime_success(): + device = create_device() + # Mock response: + # byte 0: 01 (success) + # bytes 1-4: e4 02 94 23 (temp, ignored) + # byte 5: 00 (24h mode) + # bytes 6-7: 07 e9 (year 2025) + # byte 8: 0c (Dec) + # byte 9: 1e (30) + # byte 10: 08 (Hour) + # byte 11: 37 (Minute = 55) + # byte 12: 01 (Second) + response_hex = "01e40294230007e90c1e083701" + device._send_command.return_value = bytes.fromhex(response_hex) + + result = await device.get_datetime() + device._send_command.assert_called_with("570f6901") + + assert result["12h_mode"] is False + assert result["year"] == 2025 + assert result["month"] == 12 + assert result["day"] == 30 + assert result["hour"] == 8 + assert result["minute"] == 55 + assert result["second"] == 1 + + +@pytest.mark.asyncio +async def test_get_datetime_12h_mode(): + device = create_device() + # byte 5: 80 (12h mode) + # Time: 12:00:00 + response_hex = "010000000080000001010c0000" + device._send_command.return_value = bytes.fromhex(response_hex) + + result = await device.get_datetime() + device._send_command.assert_called_with("570f6901") + + assert result["12h_mode"] is True + assert result["year"] == 0 + assert result["month"] == 1 + assert result["day"] == 1 + assert result["hour"] == 12 + assert result["minute"] == 0 + assert result["second"] == 0 + + +@pytest.mark.asyncio +async def test_get_datetime_failure(): + device = create_device() + device._send_command.return_value = bytes.fromhex("00") + + with pytest.raises(SwitchbotOperationError): + await device.get_datetime() + + +@pytest.mark.asyncio +async def test_get_datetime_wrong_response(): + device = create_device() + device._send_command.return_value = bytes.fromhex("0100") + + with pytest.raises(SwitchbotOperationError): + await device.get_datetime() + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ( + "timestamp", + "utc_offset_hours", + "utc_offset_minutes", + "expected_ts", + "expected_utc", + "expected_min", + ), + [ + (1709251200, 0, 0, "65e11a80", "0c", "00"), # 2024-03-01T00:00:00+00:00 + (1709251200, 1, 0, "65e11a80", "0d", "00"), # 2024-03-01T00:00:00+01:00 + (1709251200, 5, 45, "65e1250c", "11", "2d"), # 2024-03-01T00:00:00+05:45 + (1709251200, -6, 15, "65e11e04", "06", "0f"), # 2024-03-01T00:00:00-05:45 + ], +) +async def test_set_datetime( # noqa: PLR0913 + timestamp: int, + utc_offset_hours: int, + utc_offset_minutes: int, + expected_ts: str, + expected_utc: str, + expected_min: str, +): + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + await device.set_datetime( + timestamp, + utc_offset_hours=utc_offset_hours, + utc_offset_minutes=utc_offset_minutes, + ) + + expected_ts = expected_ts.zfill(16) + expected_payload = "57000503" + expected_utc + expected_ts + expected_min + device._send_command.assert_called_with(expected_payload) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "bad_hour", + [-13, 15], +) +async def test_set_datetime_invalid_utc_offset_hours(bad_hour: int): + device = create_device() + with pytest.raises(SwitchbotOperationError): + await device.set_datetime(1709251200, utc_offset_hours=bad_hour) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "bad_min", + [-1, 60], +) +async def test_set_datetime_invalid_utc_offset_minutes(bad_min: int): + device = create_device() + with pytest.raises(SwitchbotOperationError): + await device.set_datetime(1709251200, utc_offset_minutes=bad_min) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("is_12h_mode", "expected_payload"), + [ + (True, "80"), + (False, "00"), + ], +) +async def test_set_time_display_format(is_12h_mode: bool, expected_payload: str): + device = create_device() + device._send_command.return_value = bytes.fromhex("01") + + await device.set_time_display_format(is_12h_mode=is_12h_mode) + device._send_command.assert_called_with("570f680505" + expected_payload) + + +@pytest.mark.asyncio +async def test_set_time_display_format_failure(): + device = create_device() + device._send_command.return_value = bytes.fromhex("00") + + with pytest.raises(SwitchbotOperationError): + await device.set_time_display_format(is_12h_mode=True)