-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathasync_executor.py
More file actions
61 lines (47 loc) · 2.5 KB
/
async_executor.py
File metadata and controls
61 lines (47 loc) · 2.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import asyncio
import os
import re
import requests
from loguru import logger
from EventToInternet.KeyboardListener import EventDict, KeyboardListener
from EventToInternet.logging_config import CONSOLE_LOGGING_CONFIG, FILE_LOGGING_CONFIG
# apply logging configuration
logger.configure(handlers=[CONSOLE_LOGGING_CONFIG, FILE_LOGGING_CONFIG])
API_RFID_ENDPOINT = os.getenv("HID_EVENT__RFID_ENDPOINT", "http://127.0.0.1:5001/employee/handle-rfid-event")
API_BARCODE_ENDPOINT = os.getenv("HID_EVENT__BARCODE_ENDPOINT", "http://127.0.0.1:5001/workbench/handle-barcode-event")
def is_a_ean13_barcode(string: str) -> bool:
"""define if the barcode scanner input is a valid EAN13 barcode"""
return bool(re.fullmatch(r"\d{13}", string))
def identify_sender(event_dict: EventDict) -> EventDict:
known_hid_devices: dict[str, str] = {
"rfid_reader": os.getenv("HID_DEVICES__RFID_READER", "Sample"),
"barcode_reader": os.getenv("HID_DEVICES__BARCODE_READER", "Sample"),
}
for sender_name, device_name in known_hid_devices.items():
if device_name == event_dict["name"]:
if sender_name == "barcode_reader" and not is_a_ean13_barcode(event_dict["string"]):
logger.warning(f"{event_dict['string']} is not a EAN13 barcode and cannot be an internal unit ID.")
return event_dict
event_dict["name"] = sender_name
return event_dict
class HidEventListener(KeyboardListener):
"""listen to a barcode and RFID reader in the background"""
async def dict_handler(self, event_dict: EventDict) -> None:
"""handle RFID or barcode scan event and post data to a Rest API endpoint"""
logger.debug(f"Handling event: {event_dict}")
event_dict = identify_sender(event_dict)
match event_dict["name"]:
case "rfid_reader":
logger.debug(f'Handling RFID event. String: {event_dict["string"]}')
requests.post(url=API_RFID_ENDPOINT, json=event_dict)
logger.info(f"Event relayed to endpoint {API_RFID_ENDPOINT}")
case "barcode_reader":
logger.debug(f'Handling barcode event. String: {event_dict["string"]}')
requests.post(url=API_BARCODE_ENDPOINT, json=event_dict)
logger.info(f"Event relayed to endpoint {API_BARCODE_ENDPOINT}")
case _:
logger.warning(f'Unknown device: {event_dict["name"]}.')
if __name__ == "__main__":
HidEventListener()
loop = asyncio.get_event_loop()
loop.run_forever()