Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.

Commit 467cc6a

Browse files
authored
Added new integration test for communication to ground (#248)
* Added new integration test for communication to ground * Changed to display actions required by the test operator * changed the queue * update with different queue * Added metadata to queue * Changed to use worker id
1 parent 1cc153d commit 467cc6a

File tree

1 file changed

+128
-0
lines changed

1 file changed

+128
-0
lines changed
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
"""
2+
Test MAVLink integration test
3+
"""
4+
5+
import multiprocessing as mp
6+
import time
7+
8+
from utilities.workers import queue_proxy_wrapper
9+
from utilities.workers import worker_controller
10+
11+
from modules.common.modules import position_global
12+
from modules.common.modules.data_encoding import message_encoding_decoding
13+
from modules.common.modules.data_encoding import metadata_encoding_decoding
14+
from modules.common.modules.data_encoding import worker_enum
15+
from modules.flight_interface import flight_interface_worker
16+
17+
18+
MAVLINK_CONNECTION_ADDRESS = "tcp:localhost:14550"
19+
FLIGHT_INTERFACE_TIMEOUT = 30.0 # seconds
20+
FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate
21+
FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds
22+
WORKER_ID = worker_enum.WorkerEnum.FLIGHT_INTERFACE_WORKER
23+
24+
25+
def apply_communications_test(
26+
communications_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
27+
) -> bool:
28+
"""
29+
Method to send in hardcoded GPS coordinates to the flight interface worker
30+
"""
31+
gps_coordinates = [
32+
position_global.PositionGlobal.create(43.47321268948186, -80.53950244232878, 10), # E7
33+
position_global.PositionGlobal.create(37.7749, 122.4194, 30), # San Francisco
34+
position_global.PositionGlobal.create(40.7128, 74.0060, -5.6), # New York
35+
position_global.PositionGlobal.create(51.5072, 0.1276, 20.1), # London UK
36+
]
37+
38+
# Place the GPS coordinates
39+
print(f"Inserting list of gps coordinates, length {len(gps_coordinates)}")
40+
success, metadata = metadata_encoding_decoding.encode_metadata(WORKER_ID, len(gps_coordinates))
41+
if not success:
42+
return False
43+
44+
communications_input_queue.queue.put(metadata)
45+
46+
for success, gps_coordinate in gps_coordinates:
47+
if not success:
48+
print("ERROR: GPS Coordinate not successfully generated")
49+
return False
50+
51+
success, message = message_encoding_decoding.encode_position_global(
52+
WORKER_ID, gps_coordinate
53+
)
54+
55+
if not success:
56+
print("ERROR: Conversion from PositionGlobal to bytes failed")
57+
return False
58+
59+
communications_input_queue.queue.put(message)
60+
61+
# Wait for processing
62+
time.sleep(10)
63+
64+
# Verify that stuff is sending
65+
print(
66+
"TEST OPERATOR ACTION REQUIRED: Open mission planner's MAVLink inspector or the groundside repo (https://github.com/UWARG/statustext-parser-2025) to check for MAVLink messages"
67+
)
68+
return True
69+
70+
71+
# pylint: disable=duplicate-code
72+
def main() -> int:
73+
"""
74+
Main function
75+
"""
76+
# Setup
77+
controller = worker_controller.WorkerController()
78+
79+
mp_manager = mp.Manager()
80+
81+
in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
82+
communications_input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
83+
out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
84+
home_position_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
85+
86+
worker = mp.Process(
87+
target=flight_interface_worker.flight_interface_worker,
88+
args=(
89+
MAVLINK_CONNECTION_ADDRESS,
90+
FLIGHT_INTERFACE_TIMEOUT,
91+
FLIGHT_INTERFACE_BAUD_RATE,
92+
FLIGHT_INTERFACE_WORKER_PERIOD,
93+
in_queue,
94+
communications_input_queue,
95+
out_queue,
96+
home_position_out_queue,
97+
controller,
98+
),
99+
)
100+
101+
worker.start()
102+
103+
time.sleep(3)
104+
105+
# Test
106+
home_position = home_position_out_queue.queue.get()
107+
assert home_position is not None
108+
109+
# Run the apply_communication tests
110+
test_result = apply_communications_test(communications_input_queue)
111+
if not test_result:
112+
print("apply_communications test failed.")
113+
worker.terminate()
114+
return -1
115+
116+
# Teardown
117+
controller.request_exit()
118+
worker.join()
119+
120+
return 0
121+
122+
123+
if __name__ == "__main__":
124+
result_main = main()
125+
if result_main < 0:
126+
print(f"ERROR: Status code: {result_main}")
127+
128+
print("Done!")

0 commit comments

Comments
 (0)