Skip to content

Commit 0befb36

Browse files
committed
System test, updated monitor interface
1 parent 3e65cf5 commit 0befb36

7 files changed

Lines changed: 170 additions & 7 deletions

File tree

README.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,32 @@ A Web Interface is available to monitor which frames are pending for which devic
7575
![sw arch](images/arch.png)
7676

7777

78+
# Sizing
79+
80+
For the docker deployment, container should have the following sizing :
81+
82+
- CPU : 1 Core
83+
- RAM : 256 MB
84+
- Network : 1 Mbps
85+
- Disk space : No disk usage. Everything is stored in-memory
86+
7887
# Test
7988

89+
Tests dependencies are listed in the `requirements.txt` file under `tests` folder.
8090
Tests should always be run inside the `tests` folder for proper initialization.
8191

92+
93+
# Limitation
94+
95+
Stress tests shows the following :
96+
- At very high load (thousands of different devEUI over a minute) :
97+
- Monitor interface can fail because the `frame_buffer` object is being rendered to front-end and modified in same time.
98+
- Self-hosted MQTT broker can crash (mosquitto process)
99+
- For a very high number of pending fragment (100k different devEUI) : Monitor interface can take up to 1 mn to fully load, because everything is displayed in the front-end.
100+
101+
82102
# Roadmap
83103

84104
- HTTP Input/Output
85105
- Do some modularization to allow interfacing multiple decoders
86-
- Tests
106+
- Multi-stage docker to remove NPM dependencies (which add few hundreds of MB to the image size)

src/app/lib/validate_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
"type": "dict",
6565
"schema": {
6666
"max_chunks": {"type": "integer", "min": 1, "required": True},
67-
"timeout": {"type": "integer", "min": 1, "required": True},
67+
"timeout": {"type": "float", "min": 0.005, "required": True}, # min is 20s
6868
"lns": {"type": "string", "allowed": ["ttn", "loriot"], "required": True},
6969
},
7070
},

src/app/main.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,10 @@ def on_mqtt_message(client, userdata, message: mqtt.MQTTMessage) -> None:
144144
# We don't care about non fragmented frames
145145
if frame["fPort"] in {FRAGMENT_FPORT, LAST_FRAGMENT_FPORT}:
146146
with frame_lock:
147+
147148
if frame["devEUI"] in frame_buffer:
148149
frame_buffer[frame["devEUI"]].append(frame)
150+
logger.info(f"Received fragment from {frame['devEUI']}")
149151
else:
150152
frame_buffer[frame["devEUI"]] = [frame]
151153

@@ -172,7 +174,7 @@ def parse_mqtt(message: mqtt.MQTTMessage):
172174
exit(1)
173175

174176
if not frame:
175-
logger.debug("Could not decode the MQTT received.")
177+
logger.debug("Could not decode the MQTT received. Check that LNS is the right one in config.yaml.")
176178
raise InvalidFrame
177179

178180
return frame
@@ -208,13 +210,14 @@ def receive_http_chunk():
208210
def monitor_buffer():
209211
table_data = copy.deepcopy(frame_buffer)
210212

213+
211214
for frame_list in table_data.values():
212215
for frame in frame_list:
213216
frame["received_time_str"] = datetime.datetime.fromtimestamp(frame["received_time"]).strftime("%Y-%m-%d %H:%M:%S") # type: ignore
214217
frame["raw_hex"] = frame["raw"].hex() # type: ignore
215218

216219

217-
result = render_template("monitor.html",data=table_data)
220+
result = render_template("monitor.html",data=table_data, timeout=get_timeout(), max_chunk=get_max_chunk())
218221
return result, 200
219222

220223
######### OUTPUT #########
@@ -224,8 +227,9 @@ def monitor_buffer():
224227

225228
# MQTT : Publish
226229
def send_mqtt_message(devEUI: str, frame: dict):
227-
logger.debug(f"Publishing decoded frame for DevEUI {devEUI} to MQTT")
228-
res = client_mqtt_output.publish(config["output"]["mqtt"]["topic"]+f"/{devEUI}", json.dumps(frame))
230+
topic = config["output"]["mqtt"]["topic"]+f"/{devEUI}"
231+
logger.info(f"Publishing decoded frame for DevEUI {devEUI} to MQTT on topic {topic}")
232+
res = client_mqtt_output.publish(topic, json.dumps(frame))
229233
if res.rc == mqtt.MQTT_ERR_SUCCESS:
230234
return True
231235
else:
@@ -306,7 +310,6 @@ def init_timeout_checker():
306310

307311
def init_output():
308312
global client_mqtt_output
309-
print(config)
310313
if config["output"]["mqtt"]["enable"] == True:
311314
try:
312315
status_code = client_mqtt_output.connect(config["output"]["mqtt"]["host"], config["output"]["mqtt"]["port"])

src/app/templates/monitor.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
<h2>List of pending fragments</h2>
1414
<p>If the table is empty, it means that there are no pending frames to be re-assembled.</p>
15+
<p>Timeout configured : {{ timeout }} hours</p>
16+
<p>Max Chunk configured : {{ max_chunk }}</p>
1517

1618
<button id="refreshBtn">Refresh Page</button>
1719

tests/benchmark.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# File used to benchmark app with a lot of requests
2+
import paho.mqtt.client as mqtt
3+
from paho.mqtt.enums import CallbackAPIVersion
4+
import json
5+
import time
6+
from threading import Thread
7+
8+
9+
10+
def on_message(client, userdata, message: mqtt.MQTTMessage) -> None:
11+
decoded = json.loads(message.payload)
12+
print("message, ", decoded)
13+
14+
pass
15+
16+
17+
DEVEUI = "4200000000000000"
18+
client_mqtt = mqtt.Client(callback_api_version=CallbackAPIVersion.VERSION2)
19+
client_mqtt.on_message = on_message
20+
status_code = client_mqtt.connect("localhost", 1883)
21+
client_mqtt.subscribe("output/topic/#")
22+
client_mqtt.loop_start()
23+
24+
25+
msg1 = json.load(open("payload/payload_ttn_fragment1.json"))
26+
last = json.load(open("payload/payload_ttn_fragment_last.json"))
27+
28+
29+
def scenario1():
30+
# SCENARIO 1 : Lot of proper request
31+
32+
FRAME_NUMBER = 100
33+
for i in range(FRAME_NUMBER):
34+
# Change devEUI dynamically
35+
36+
msg1["end_device_ids"]["dev_eui"] = "1Dev" +str(i)
37+
last["end_device_ids"]["dev_eui"] = "1Dev" +str(i)
38+
39+
res = client_mqtt.publish(payload=json.dumps(msg1), topic="test")
40+
res.wait_for_publish()
41+
42+
43+
res = client_mqtt.publish(payload=json.dumps(last), topic="test")
44+
res.wait_for_publish()
45+
46+
47+
def scenario2():
48+
# SCENARIO 2 : lot of pending fragments
49+
50+
FRAME_NUMBER = 10 *1000
51+
for i in range(FRAME_NUMBER):
52+
# Change devEUI dynamically
53+
54+
msg1["end_device_ids"]["dev_eui"] = "2Dev" +str(i)
55+
56+
res = client_mqtt.publish(payload=json.dumps(msg1), topic="test")
57+
res.wait_for_publish()
58+
59+
def scenario3():
60+
61+
FRAME_NUMBER = 100
62+
THREAD_COUNT = 10
63+
def spam(dev_prefix):
64+
for i in range(FRAME_NUMBER):
65+
msg1["end_device_ids"]["dev_eui"] = f"{dev_prefix}{i}"
66+
client_mqtt.publish(payload=json.dumps(msg1), topic="test").wait_for_publish()
67+
68+
threads = [Thread(target=spam, args=(f"concurrent{i}_",)) for i in range(THREAD_COUNT)]
69+
70+
for t in threads: t.start()
71+
for t in threads: t.join()
72+
73+
74+
75+
scenario3()
76+
77+
78+
# Finish sending
79+
time.sleep(1)
80+
print("Benchmark done")

tests/conftest.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,3 +148,6 @@ def start_system(monkeymodule):
148148
time.sleep(1)
149149

150150
yield mosquitto_process
151+
152+
with pytest.raises(SystemExit):
153+
main.shutdown(mosquitto_process)

tests/system/test_system.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1+
import json
2+
import time
13
import pytest
24
import requests
35

46
# Project import
57
import main
8+
import paho.mqtt.client as mqtt
9+
from paho.mqtt.enums import CallbackAPIVersion
10+
from tests.static import EXAMPLE_CONFIG, wait_until, DATAFORMAT2_DECODED
611

712
@pytest.mark.usefixtures("start_system")
813
class TestSystem:
@@ -12,6 +17,56 @@ def test_system_monitor(self,start_system):
1217
assert requests.get(f'http://localhost:{main.config["input"]["http"]["port"]}/monitor').status_code == 200
1318

1419

20+
def test_frame_accepted(self,start_system):
21+
"""Check that a TTN frame is accepted and seen in monitor page"""
22+
23+
frame_received = None
24+
25+
def on_message(client, userdata, message: mqtt.MQTTMessage) -> None:
26+
nonlocal frame_received
27+
decoded = json.loads(message.payload)
28+
29+
# It means we recieved a true decoded frame
30+
if "preset_id" in decoded["data"]:
31+
frame_received = decoded
32+
pass
33+
34+
DEVEUI = "4200000000000000"
35+
client_mqtt = mqtt.Client(callback_api_version=CallbackAPIVersion.VERSION2)
36+
client_mqtt.on_message = on_message
37+
status_code = client_mqtt.connect("localhost", EXAMPLE_CONFIG["output"]["mqtt"]["port"])
38+
client_mqtt.subscribe(f"output/{DEVEUI}")
39+
client_mqtt.loop_start()
40+
41+
42+
assert status_code == mqtt.MQTT_ERR_SUCCESS
43+
44+
45+
assert DEVEUI not in main.frame_buffer
46+
47+
# Fragment 1
48+
raw_msg = json.load(open("payload/payload_ttn_fragment1.json"))
49+
client_mqtt.publish(payload=json.dumps(raw_msg), topic=EXAMPLE_CONFIG["input"]["mqtt"]["topic"])
50+
51+
52+
assert wait_until(lambda: DEVEUI in main.frame_buffer)
53+
assert len(main.frame_buffer[DEVEUI]) == 1
54+
55+
# Fragment 2
56+
raw_msg = json.load(open("payload/payload_ttn_fragment_last.json"))
57+
client_mqtt.publish(payload=json.dumps(raw_msg), topic=EXAMPLE_CONFIG["input"]["mqtt"]["topic"])
58+
59+
60+
# No longer fragmented
61+
assert wait_until(lambda: DEVEUI not in main.frame_buffer)
62+
63+
# We received the answer, decoded
64+
assert wait_until(lambda: frame_received == DATAFORMAT2_DECODED)
65+
66+
67+
68+
69+
1570

1671
def test_stop_system(self,start_system):
1772
"""Check that launch func launch every services"""

0 commit comments

Comments
 (0)