-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcommon.py
More file actions
51 lines (42 loc) · 1.66 KB
/
common.py
File metadata and controls
51 lines (42 loc) · 1.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import os
from dotenv import load_dotenv
load_dotenv()
BROKER_HOST = os.getenv("BROKER_HOST", "127.0.0.1")
BROKER_PORT = int(os.getenv("BROKER_PORT", "1883"))
DEVICE_UUID = os.getenv("DEVICE_UUID", "healthcaredispenser")
def topic_base(uuid: str | None = None) -> str:
return f"dispenser/{(uuid or DEVICE_UUID)}"
def topics(uuid: str | None = None) -> dict:
base = topic_base(uuid)
return {
"pub_register": f"{base}/register",
"pub_command_resp": f"{base}/command/response",
"pub_wash_resp": f"{base}/wash/response",
"sub_register_resp": f"{base}/register/response",
"sub_command": f"{base}/command",
"sub_wash": f"{base}/wash",
}
# 서버가 'uuid' 키를 기대 → 여기서 통일
def build_register_payload() -> dict:
return {"uuid": DEVICE_UUID}
def build_command_response(command_uuid: str, status: str) -> dict:
return {"commandUuid": command_uuid, "status": status}
def build_wash_response(slot: int, status: str) -> dict:
return {"slot": int(slot), "status": status}
def _to_float(v) -> float:
try:
return float(v)
except Exception:
return 0.0
def parse_command_payload(data: dict) -> dict:
if not isinstance(data, dict):
return {}
return {
"commandUuid": data.get("commandUuid"),
"commandType": (data.get("commandType") or "DISPENSE").upper(),
"zinc": _to_float(data.get("zinc")),
"melatonin": _to_float(data.get("melatonin")),
"magnesium": _to_float(data.get("magnesium")),
"electrolyte": _to_float(data.get("electrolyte")),
"slot": data.get("slot"),
}