Skip to content

Commit 62c8da6

Browse files
committed
Typing for bluetooth module
1 parent d56bb59 commit 62c8da6

File tree

13 files changed

+374
-134
lines changed

13 files changed

+374
-134
lines changed

term_timer/bluetooth/constants.py

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,37 @@
1-
# ruff: noqa: E222
1+
# ruff: noqa: E222 E501
2+
from typing import Final
23

3-
DEBOUNCE = 0.5
4+
DEBOUNCE: Final[float] = 0.5
45

5-
PREFIX = [
6+
PREFIX: Final[list[str]] = [
67
'GAN',
78
'MG',
89
'AiCube',
910
'WCU_MY32',
1011
]
1112

1213
# GAN Gen2 protocol BLE service
13-
GAN_GEN2_SERVICE = '6e400001-b5a3-f393-e0a9-e50e24dc4179'
14-
GAN_GEN2_STATE_CHARACTERISTIC = '28be4cb6-cd67-11e9-a32f-2a2ae2dbcce4'
15-
GAN_GEN2_COMMAND_CHARACTERISTIC = '28be4a4a-cd67-11e9-a32f-2a2ae2dbcce4'
14+
GAN_GEN2_SERVICE: Final[str] = '6e400001-b5a3-f393-e0a9-e50e24dc4179'
15+
GAN_GEN2_STATE_CHARACTERISTIC: Final[str] = '28be4cb6-cd67-11e9-a32f-2a2ae2dbcce4'
16+
GAN_GEN2_COMMAND_CHARACTERISTIC: Final[str] = '28be4a4a-cd67-11e9-a32f-2a2ae2dbcce4'
1617

1718
# GAN Gen3 protocol BLE service
18-
GAN_GEN3_SERVICE = '8653000a-43e6-47b7-9cb0-5fc21d4ae340'
19-
GAN_GEN3_STATE_CHARACTERISTIC = '8653000b-43e6-47b7-9cb0-5fc21d4ae340'
20-
GAN_GEN3_COMMAND_CHARACTERISTIC = '8653000c-43e6-47b7-9cb0-5fc21d4ae340'
19+
GAN_GEN3_SERVICE: Final[str] = '8653000a-43e6-47b7-9cb0-5fc21d4ae340'
20+
GAN_GEN3_STATE_CHARACTERISTIC: Final[str] = '8653000b-43e6-47b7-9cb0-5fc21d4ae340'
21+
GAN_GEN3_COMMAND_CHARACTERISTIC: Final[str] = '8653000c-43e6-47b7-9cb0-5fc21d4ae340'
2122

2223
# GAN Gen4 protocol BLE service
23-
GAN_GEN4_SERVICE = '00000010-0000-fff7-fff6-fff5fff4fff0'
24-
GAN_GEN4_STATE_CHARACTERISTIC = '0000fff6-0000-1000-8000-00805f9b34fb'
25-
GAN_GEN4_COMMAND_CHARACTERISTIC = '0000fff5-0000-1000-8000-00805f9b34fb'
24+
GAN_GEN4_SERVICE: Final[str] = '00000010-0000-fff7-fff6-fff5fff4fff0'
25+
GAN_GEN4_STATE_CHARACTERISTIC: Final[str] = '0000fff6-0000-1000-8000-00805f9b34fb'
26+
GAN_GEN4_COMMAND_CHARACTERISTIC: Final[str] = '0000fff5-0000-1000-8000-00805f9b34fb'
2627

2728
# Moyu Weilong v10 protocol BLE service
28-
MOYU_WEILONG_SERVICE = '0783b03e-7735-b5a0-1760-a305d2795cb0'
29-
MOYU_WEILONG_STATE_CHARACTERISTIC = '0783b03e-7735-b5a0-1760-a305d2795cb1'
30-
MOYU_WEILONG_COMMAND_CHARACTERISTIC = '0783b03e-7735-b5a0-1760-a305d2795cb2'
29+
MOYU_WEILONG_SERVICE: Final[str] = '0783b03e-7735-b5a0-1760-a305d2795cb0'
30+
MOYU_WEILONG_STATE_CHARACTERISTIC: Final[str] = '0783b03e-7735-b5a0-1760-a305d2795cb1'
31+
MOYU_WEILONG_COMMAND_CHARACTERISTIC: Final[str] = '0783b03e-7735-b5a0-1760-a305d2795cb2'
3132

3233
# Key used by GAN Gen2, Gen3 and Gen4 cubes
33-
GAN_ENCRYPTION_KEY = {
34+
GAN_ENCRYPTION_KEY: Final[dict[str, list[int]]] = {
3435
'key': [
3536
0x01, 0x02, 0x42, 0x28,
3637
0x31, 0x91, 0x16, 0x07,
@@ -46,7 +47,7 @@
4647
}
4748

4849
# Key used by MoYu AI 2023
49-
MOYU_AI_ENCRYPTION_KEY = {
50+
MOYU_AI_ENCRYPTION_KEY: Final[dict[str, list[int]]] = {
5051
'key': [
5152
0x05, 0x12, 0x02, 0x45,
5253
0x02, 0x01, 0x29, 0x56,
@@ -63,7 +64,7 @@
6364

6465

6566
# Key used by MoYu Weilong v10
66-
MOYU_WEILONG_ENCRYPTION_KEY = {
67+
MOYU_WEILONG_ENCRYPTION_KEY: Final[dict[str, list[int]]] = {
6768
'key': [
6869
0x15, 0x77, 0x3A, 0x5C,
6970
0x67, 0x0E, 0x2D, 0x1F,
Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,41 @@
1+
from collections.abc import Sequence
2+
from typing import Any
3+
from typing import ClassVar
4+
15
from bleak import BleakClient
26

7+
from term_timer.bluetooth.encrypter import GanGen2CubeEncrypter
8+
from term_timer.bluetooth.types import EventDict
9+
310

411
class Driver:
5-
service_uid = ''
6-
state_characteristic_uid = ''
7-
command_characteristic_uid = ''
12+
service_uid: ClassVar[str] = ''
13+
state_characteristic_uid: ClassVar[str] = ''
14+
command_characteristic_uid: ClassVar[str] = ''
815

9-
disable_gyro = True
16+
disable_gyro: bool = True
1017

1118
def __init__(self, client: BleakClient) -> None:
12-
self.client = client
19+
self.client: BleakClient = client
1320

14-
self.events: list[dict[str, object]] = []
15-
self.cypher = self.init_cypher()
21+
self.events: list[EventDict] = []
22+
self.cypher: GanGen2CubeEncrypter = self.init_cypher()
1623

17-
def init_cypher(self) -> None:
18-
pass
24+
def init_cypher(self) -> GanGen2CubeEncrypter:
25+
raise NotImplementedError
1926

20-
def send_command_handler(self, command: str) -> bool:
27+
def send_command_handler(self, command: str) -> bytes | bool:
2128
raise NotImplementedError
2229

23-
def event_handler(self, sender, data) -> list[dict[str, object]]:
30+
async def event_handler(self, sender: int, data: bytes) -> list[EventDict]:
2431
raise NotImplementedError
2532

26-
def add_event(self,
27-
store: list[dict[str, object]],
28-
event: dict[str, object] | list[dict[str, object]]) -> None:
29-
if isinstance(event, list):
30-
store.extend(event)
31-
self.events.extend(event)
33+
def add_event(self, store: list[EventDict],
34+
event: dict[str, Any] | Sequence[dict[str, Any]]) -> None:
35+
if isinstance(event, (list, tuple)):
36+
for e in event:
37+
store.append(e)
38+
self.events.append(e)
3239
else:
33-
store.append(event)
34-
self.events.append(event)
40+
store.append(event) # type: ignore[arg-type]
41+
self.events.append(event) # type: ignore[arg-type]

term_timer/bluetooth/drivers/gan_gen2.py

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import time
77
from datetime import datetime
88
from datetime import timezone
9+
from typing import ClassVar
910

11+
from bleak import BleakClient
1012
from cubing_algs.facelets import cubies_to_facelets
1113

1214
from term_timer.bluetooth.constants import GAN_ENCRYPTION_KEY
@@ -18,6 +20,7 @@
1820
from term_timer.bluetooth.encrypter import GanGen2CubeEncrypter
1921
from term_timer.bluetooth.message import GanProtocolMessage
2022
from term_timer.bluetooth.salt import get_salt
23+
from term_timer.bluetooth.types import EventDict
2124

2225
logger = logging.getLogger(__name__)
2326

@@ -33,20 +36,20 @@ class GanGen2Driver(Driver):
3336
Monster Go 3Ai
3437
MoYu AI 2023
3538
"""
36-
service_uid = GAN_GEN2_SERVICE
37-
state_characteristic_uid = GAN_GEN2_STATE_CHARACTERISTIC
38-
command_characteristic_uid = GAN_GEN2_COMMAND_CHARACTERISTIC
39-
encrypter = GanGen2CubeEncrypter
39+
service_uid: ClassVar[str] = GAN_GEN2_SERVICE
40+
state_characteristic_uid: ClassVar[str] = GAN_GEN2_STATE_CHARACTERISTIC
41+
command_characteristic_uid: ClassVar[str] = GAN_GEN2_COMMAND_CHARACTERISTIC
42+
encrypter: ClassVar[type[GanGen2CubeEncrypter]] = GanGen2CubeEncrypter
4043

41-
def __init__(self, client):
44+
def __init__(self, client: BleakClient) -> None:
4245
super().__init__(client)
4346

44-
self.last_serial = -1
45-
self.cube_timestamp = 0
46-
self.last_move_timestamp = 0
47+
self.last_serial: int = -1
48+
self.cube_timestamp: float = 0.0
49+
self.last_move_timestamp: datetime | None = None
4750

48-
def init_cypher(self):
49-
if self.client.name.startswith('AiCube'):
51+
def init_cypher(self) -> GanGen2CubeEncrypter:
52+
if self.client.name and self.client.name.startswith('AiCube'):
5053
return self.encrypter(
5154
MOYU_AI_ENCRYPTION_KEY['key'],
5255
MOYU_AI_ENCRYPTION_KEY['iv'],
@@ -58,7 +61,7 @@ def init_cypher(self):
5861
get_salt(self.client.address),
5962
)
6063

61-
def send_command_handler(self, command: str):
64+
def send_command_handler(self, command: str) -> bytes | bool:
6265
msg = bytearray(20)
6366

6467
if command == 'REQUEST_FACELETS':
@@ -78,12 +81,12 @@ def send_command_handler(self, command: str):
7881

7982
return self.cypher.encrypt(bytes(msg))
8083

81-
async def event_handler(self, sender, data): # noqa: ARG002
84+
async def event_handler(self, sender: int, data: bytes) -> list[EventDict]: # noqa: ARG002
8285
"""Process notifications from the cube"""
8386
clock = time.perf_counter_ns()
8487
timestamp = datetime.now(tz=timezone.utc) # noqa: UP017
8588

86-
events = []
89+
events: list[EventDict] = []
8790

8891
msg = GanProtocolMessage(
8992
self.cypher.decrypt(data),
@@ -140,13 +143,16 @@ async def event_handler(self, sender, data): # noqa: ARG002
140143
face = msg.get_bit_word(12 + 5 * i, 4)
141144
direction = msg.get_bit_word(16 + 5 * i, 1)
142145
move = 'URFDLB'[face] + " '"[direction]
143-
elapsed = msg.get_bit_word(47 + 16 * i, 16)
146+
elapsed_raw = msg.get_bit_word(47 + 16 * i, 16)
144147

145148
# In case of 16-bit cube timestamp register overflow
146-
if elapsed == 0 and self.last_move_timestamp:
149+
elapsed: float
150+
if elapsed_raw == 0 and self.last_move_timestamp is not None:
147151
elapsed = (
148152
timestamp - self.last_move_timestamp
149153
).total_seconds()
154+
else:
155+
elapsed = float(elapsed_raw)
150156

151157
self.cube_timestamp += elapsed
152158
payload = {

term_timer/bluetooth/drivers/gan_gen3.py

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@
44
"""
55
import logging
66
import time
7+
from collections.abc import Sequence
78
from datetime import datetime
89
from datetime import timezone
10+
from typing import Any
11+
from typing import ClassVar
12+
from typing import cast
913

14+
from bleak import BleakClient
1015
from cubing_algs.facelets import cubies_to_facelets
1116

1217
from term_timer.bluetooth.constants import DEBOUNCE
@@ -15,6 +20,8 @@
1520
from term_timer.bluetooth.constants import GAN_GEN3_STATE_CHARACTERISTIC
1621
from term_timer.bluetooth.drivers.gan_gen2 import GanGen2Driver
1722
from term_timer.bluetooth.message import GanProtocolMessage
23+
from term_timer.bluetooth.types import EventDict
24+
from term_timer.bluetooth.types import MoveEventDict
1825

1926
logger = logging.getLogger(__name__)
2027

@@ -23,19 +30,19 @@ class GanGen3Driver(GanGen2Driver):
2330
"""
2431
GAN356 i Carry 2
2532
"""
26-
service_uid = GAN_GEN3_SERVICE
27-
state_characteristic_uid = GAN_GEN3_STATE_CHARACTERISTIC
28-
command_characteristic_uid = GAN_GEN3_COMMAND_CHARACTERISTIC
33+
service_uid: ClassVar[str] = GAN_GEN3_SERVICE
34+
state_characteristic_uid: ClassVar[str] = GAN_GEN3_STATE_CHARACTERISTIC
35+
command_characteristic_uid: ClassVar[str] = GAN_GEN3_COMMAND_CHARACTERISTIC
2936

30-
def __init__(self, client):
37+
def __init__(self, client: BleakClient) -> None:
3138
super().__init__(client)
3239

33-
self.serial = -1
34-
self.last_serial = -1
35-
self.last_local_timestamp = None
36-
self.move_buffer = []
40+
self.serial: int = -1
41+
self.last_serial: int = -1
42+
self.last_local_timestamp: datetime | None = None
43+
self.move_buffer: list[MoveEventDict] = []
3744

38-
def send_command_handler(self, command: str):
45+
def send_command_handler(self, command: str) -> bytes | bool:
3946
msg = bytearray(16)
4047

4148
if command == 'REQUEST_FACELETS':
@@ -58,7 +65,7 @@ def send_command_handler(self, command: str):
5865

5966
return self.cypher.encrypt(bytes(msg))
6067

61-
async def request_move_history(self, serial, count):
68+
async def request_move_history(self, serial: int, count: int) -> None:
6269
msg = bytearray(16)
6370

6471
# Move history response data is byte-aligned,
@@ -89,8 +96,8 @@ async def request_move_history(self, serial, count):
8996
self.cypher.encrypt(bytes(msg)),
9097
)
9198

92-
async def evict_move_buffer(self):
93-
evicted_events = []
99+
async def evict_move_buffer(self) -> list[MoveEventDict]:
100+
evicted_events: list[MoveEventDict] = []
94101

95102
while len(self.move_buffer) > 0:
96103
buffer_head = self.move_buffer[0]
@@ -104,19 +111,20 @@ async def evict_move_buffer(self):
104111
self.last_serial = buffer_head['serial']
105112

106113
if len(self.move_buffer) > 16:
107-
self.client.disconnect()
114+
await self.client.disconnect()
108115

109116
return evicted_events
110117

111-
def is_serial_in_range(self, start, end, serial,
112-
*, closed_start=False, closed_end=False):
118+
def is_serial_in_range(self, start: int, end: int, serial: int, *,
119+
closed_start: bool = False,
120+
closed_end: bool = False) -> bool:
113121
return (
114122
((end - start) & 0xFF) >= ((serial - start) & 0xFF)
115123
and (closed_start or ((start - serial) & 0xFF) > 0)
116124
and (closed_end or ((end - serial) & 0xFF) > 0)
117125
)
118126

119-
def inject_missed_move_to_buffer(self, move):
127+
def inject_missed_move_to_buffer(self, move: MoveEventDict) -> None:
120128
if len(self.move_buffer) > 0:
121129
buffer_head = self.move_buffer[0]
122130

@@ -142,7 +150,7 @@ def inject_missed_move_to_buffer(self, move):
142150
):
143151
self.move_buffer.insert(0, move)
144152

145-
async def check_if_move_missed(self):
153+
async def check_if_move_missed(self) -> None:
146154
diff = (self.serial - self.last_serial) & 0xFF
147155

148156
if diff > 0 and self.serial != 0:
@@ -152,12 +160,12 @@ async def check_if_move_missed(self):
152160
) & 0xFF
153161
await self.request_move_history(start_serial, diff + 1)
154162

155-
async def event_handler(self, sender, data): # noqa: ARG002
163+
async def event_handler(self, sender: int, data: bytes) -> list[EventDict]: # noqa: ARG002
156164
"""Process notifications from the cube"""
157165
clock = time.perf_counter_ns()
158166
timestamp = datetime.now(tz=timezone.utc) # noqa: UP017
159167

160-
events = []
168+
events: list[EventDict] = []
161169

162170
msg = GanProtocolMessage(
163171
self.cypher.decrypt(data),
@@ -196,7 +204,9 @@ async def event_handler(self, sender, data): # noqa: ARG002
196204
'move': move.strip(),
197205
},
198206
)
199-
self.add_event(events, await self.evict_move_buffer())
207+
evicted = await self.evict_move_buffer()
208+
if evicted:
209+
self.add_event(events, cast(Sequence[dict[str, Any]], evicted))
200210

201211
elif event == 0x02: # Facelets
202212
serial = msg.get_bit_word(24, 16, little_endian=True)
@@ -280,7 +290,9 @@ async def event_handler(self, sender, data): # noqa: ARG002
280290
},
281291
)
282292

283-
self.add_event(events, await self.evict_move_buffer())
293+
evicted = await self.evict_move_buffer()
294+
if evicted:
295+
self.add_event(events, cast(Sequence[dict[str, Any]], evicted))
284296

285297
elif event == 0x07: # Hardware
286298
sw_major = msg.get_bit_word(72, 4)

0 commit comments

Comments
 (0)