Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions modules/hitl/camera_emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import os
import time
import pyvirtualcam
import cv2

Expand All @@ -22,18 +23,34 @@ class CameraEmulator:
__create_key = object()

@classmethod
def create(cls, images_path: str) -> "tuple[True, CameraEmulator] | tuple[False, None]":
def create(
cls, images_path: str, time_between_images: float = 1.0
) -> "tuple[True, CameraEmulator] | tuple[False, None]":
"""
Setup camera emulator.

Args:
images_path: Path to the directory containing images for the camera emulator. Cycles through these images to simulate camera input (every 1 second).
time_between_images: Time in seconds between image changes.

Returns:
Success, CameraEmulator instance.
"""

if not isinstance(images_path, str):
print("Images path is not a string")
return False, None

if not os.path.isdir(images_path):
print("Images path is not a valid directory")
return False, None

if not isinstance(time_between_images, (int, float)):
print("Time between images is not a number")
return False, None

if time_between_images <= 0:
print("Time between images must be positive")
return False, None

try:
Expand All @@ -51,10 +68,16 @@ def create(cls, images_path: str) -> "tuple[True, CameraEmulator] | tuple[False,
if virtual_camera_instance is None:
return False, None

return True, CameraEmulator(cls.__create_key, images_path, virtual_camera_instance)
return True, CameraEmulator(
cls.__create_key, images_path, time_between_images, virtual_camera_instance
)

def __init__(
self, class_private_create_key: object, images_path: str, virtual_camera: pyvirtualcam
self,
class_private_create_key: object,
images_path: str,
time_between_images: float,
virtual_camera: pyvirtualcam,
) -> None:
"""
Private constructor, use create() method.
Expand All @@ -66,10 +89,33 @@ def __init__(
self.__image_paths: "list[str]" = []
self.__current_frame = None
self.__image_index = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make next image time a parameter?

self.__next_image_time = time.time() + time_between_images
self.__time_between_images = time_between_images

self.__get_images()
self.update_current_image()

def periodic(self) -> None:
"""
Periodic function.
"""
try:
# Send frame and pace to target FPS
self.send_frame()
self.sleep_until_next_frame()

now = time.time()
if now >= self.__next_image_time:
# Cycle image once per second
try:
self.next_image()
self.update_current_image()
except Exception as exc: # pylint: disable=broad-except
print(f"HITL camera image update error: {exc}")
self.__next_image_time = now + self.__time_between_images
except Exception as exc: # pylint: disable=broad-except
print(f"HITL camera periodic error: {exc}")

def send_frame(self) -> None:
"""
sends a new frame to virtual camera, should be called in a loop
Expand Down
89 changes: 69 additions & 20 deletions modules/hitl/hitl_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import time
from threading import Event, Thread
from modules.hitl.position_emulator import PositionEmulator
from modules.hitl.camera_emulator import CameraEmulator
from ..mavlink import dronekit
Expand Down Expand Up @@ -47,7 +48,6 @@ def create(

if position_module:
result, position_emulator = PositionEmulator.create(drone)
position_emulator.inject_position() # Inject initial position
if not result:
return False, None

Expand Down Expand Up @@ -81,26 +81,75 @@ def __init__(
self.position_emulator = position_emulator
self.camera_emulator = camera_emulator

def set_inject_position(self) -> None:
self._stop_event: Event | None = None
self._threads: list[Thread] = []

def start(self) -> None:
"""
Start HITL module threads.
"""
Set the position to inject into the drone.
Print out a message if position emulator is not enabled.
if self._stop_event is not None:
return

self._stop_event = Event()
self._threads = []

if self.position_emulator is not None:
t = Thread(target=self.run_position, name="HITL-Position", daemon=True)
self._threads.append(t)
t.start()

if self.camera_emulator is not None:
t = Thread(target=self.run_camera, name="HITL-Camera", daemon=True)
self._threads.append(t)
t.start()

def shutdown(self, join_timeout: float | None = 5.0) -> None:
"""
Signal threads to stop and join them.
"""
if self.position_emulator is None:
print("Position emulator is not enabled.")
if self._stop_event is None:
return
# pylint: disable=protected-access
position_target = self.drone._master.recv_match(...)
# pylint: enable=protected-access
if position_target:
latitude = position_target.lat_int / 1e7
longitude = position_target.lon_int / 1e7
altitude = position_target.alt

self.position_emulator.inject_position(latitude, longitude, altitude)
print(f"Injected position: lat={latitude}, lon={longitude}, alt={altitude}")
else:
print("No POSITION_TARGET_GLOBAL_INT message received.")

time.sleep(3)

self._stop_event.set()

for t in self._threads:
if t.is_alive():
t.join(timeout=join_timeout)

self._threads.clear()
self._stop_event = None

def __del__(self) -> None:
"""
Best-effort cleanup when HITL object is destroyed.
Ensures threads are stopped and the drone connection is closed.
"""
try:
self.shutdown()
except Exception: # pylint: disable=broad-except
pass

def run_position(self) -> None:
"""
Runs the position emulator periodic function in a loop.
"""
assert self._stop_event is not None
while not self._stop_event.is_set():
try:
self.position_emulator.periodic()
except Exception as exc: # pylint: disable=broad-except
print(f"HITL position thread error: {exc}")
time.sleep(0.1)

def run_camera(self) -> None:
"""
Runs the camera emulator periodic function in a loop.
"""
assert self._stop_event is not None
while not self._stop_event.is_set():
try:
self.camera_emulator.periodic()
except Exception as exc: # pylint: disable=broad-except
print(f"HITL camera thread error: {exc}")
time.sleep(0.1)
54 changes: 53 additions & 1 deletion modules/hitl/position_emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,61 @@ def __init__(self, class_private_create_key: object, drone: dronekit.Vehicle) ->
"""
assert class_private_create_key is PositionEmulator.__create_key, "Use create() method"

self.target_position = (43.43405014107003, -80.57898027451816, 373.0) # lat, lon, alt

self.drone = drone

def set_target_position(self, latitude: float, longitude: float, altitude: float) -> None:
"""
Sets the target position manually (currently a fallback if Ardupilot target doesnt work).

Args:
latitude: Latitude in degrees.
longitude: Longitude in degrees.
altitude: Altitude in meters.
"""
self.target_position = (latitude, longitude, altitude)

def get_target_position(self) -> tuple[float, float, float]:
"""
Gets the target position from the Ardupilot target.

Returns:
Target position as (latitude, longitude, altitude).
"""
# pylint: disable=protected-access
position_target = None
try:
position_target = self.drone._master.recv_match(
type="POSITION_TARGET_GLOBAL_INT", blocking=False
)
except Exception as exc: # pylint: disable=broad-except
print(f"HITL get_target_position recv_match error: {exc}")
position_target = None
# pylint: enable=protected-access

if position_target:
latitude = position_target.lat_int / 1e7
longitude = position_target.lon_int / 1e7
altitude = position_target.alt
return (latitude, longitude, altitude)

# Optionally log if no message received
# print("No POSITION_TARGET_GLOBAL_INT message received.")

return self.target_position

def periodic(self) -> None:
"""
Periodic function.
"""

self.target_position = self.get_target_position()

self.inject_position(
self.target_position[0], self.target_position[1], self.target_position[2]
)

def inject_position(
self,
latitude: float = 43.43405014107003,
Expand Down Expand Up @@ -68,7 +121,6 @@ def inject_position(
10, # satellites_visible
0, # yaw (deg*100)
]
print("Packing values:", values)
gps_input_msg = self.drone.message_factory.gps_input_encode(*values)
self.drone.send_mavlink(gps_input_msg)
self.drone.flush()
5 changes: 4 additions & 1 deletion modules/mavlink/flight_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ def create(
success, hitl_instance = hitl_base.HITL.create(
drone, hitl_enabled, position_module, camera_module, images_path
)
if not success:
if success:
if hitl_enabled and hitl_instance is not None:
hitl_instance.start()
else:
print("Error creating HITL module")

except dronekit.TimeoutError:
Expand Down
37 changes: 37 additions & 0 deletions tests/integration/hitl/test_threading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
Tests the threading behavior of the HITL module (gps and camera modules).
"""

# Physical connection to Pixhawk: /dev/ttyAMA0
# Simulated connection to Pixhawk: tcp:localhost:5762
PIXHAWK_ADDRESS = "/dev/ttyAMA0"

import os
import time
from modules.mavlink import flight_controller


def main() -> int:
"""
Main function.
"""
images_folder_path = os.path.join("tests", "integration", "camera_emulator", "images")

result, controller = flight_controller.FlightController.create(
PIXHAWK_ADDRESS, 57600, True, True, True, images_folder_path
)
if not result:
print("Failed to create flight controller")
return -1

time.sleep(10)

return 0


if __name__ == "__main__":
result_main = main()
if result_main != 0:
print(f"ERROR: Status code: {result_main}")

print("Done!")