diff --git a/modules/hitl/camera_emulator.py b/modules/hitl/camera_emulator.py index bfd20c3..07e82a8 100644 --- a/modules/hitl/camera_emulator.py +++ b/modules/hitl/camera_emulator.py @@ -6,6 +6,7 @@ """ import os +import time import pyvirtualcam import cv2 @@ -22,18 +23,34 @@ class CameraEmulator: __create_key = object() @classmethod - def create(cls, images_path: str) -> "tuple[True, CameraEmulator] | tuple[False, None]": + def create( + cls, images_path: str, time_between_images: float = 1.0 + ) -> "tuple[True, CameraEmulator] | tuple[False, None]": """ Setup camera emulator. Args: images_path: Path to the directory containing images for the camera emulator. Cycles through these images to simulate camera input (every 1 second). + time_between_images: Time in seconds between image changes. Returns: Success, CameraEmulator instance. """ if not isinstance(images_path, str): + print("Images path is not a string") + return False, None + + if not os.path.isdir(images_path): + print("Images path is not a valid directory") + return False, None + + if not isinstance(time_between_images, (int, float)): + print("Time between images is not a number") + return False, None + + if time_between_images <= 0: + print("Time between images must be positive") return False, None try: @@ -51,10 +68,16 @@ def create(cls, images_path: str) -> "tuple[True, CameraEmulator] | tuple[False, if virtual_camera_instance is None: return False, None - return True, CameraEmulator(cls.__create_key, images_path, virtual_camera_instance) + return True, CameraEmulator( + cls.__create_key, images_path, time_between_images, virtual_camera_instance + ) def __init__( - self, class_private_create_key: object, images_path: str, virtual_camera: pyvirtualcam + self, + class_private_create_key: object, + images_path: str, + time_between_images: float, + virtual_camera: pyvirtualcam, ) -> None: """ Private constructor, use create() method. @@ -66,10 +89,33 @@ def __init__( self.__image_paths: "list[str]" = [] self.__current_frame = None self.__image_index = 0 + self.__next_image_time = time.time() + time_between_images + self.__time_between_images = time_between_images self.__get_images() self.update_current_image() + def periodic(self) -> None: + """ + Periodic function. + """ + try: + # Send frame and pace to target FPS + self.send_frame() + self.sleep_until_next_frame() + + now = time.time() + if now >= self.__next_image_time: + # Cycle image once per second + try: + self.next_image() + self.update_current_image() + except Exception as exc: # pylint: disable=broad-except + print(f"HITL camera image update error: {exc}") + self.__next_image_time = now + self.__time_between_images + except Exception as exc: # pylint: disable=broad-except + print(f"HITL camera periodic error: {exc}") + def send_frame(self) -> None: """ sends a new frame to virtual camera, should be called in a loop diff --git a/modules/hitl/hitl_base.py b/modules/hitl/hitl_base.py index d6e3982..5c40b3d 100644 --- a/modules/hitl/hitl_base.py +++ b/modules/hitl/hitl_base.py @@ -3,6 +3,7 @@ """ import time +from threading import Event, Thread from modules.hitl.position_emulator import PositionEmulator from modules.hitl.camera_emulator import CameraEmulator from ..mavlink import dronekit @@ -47,7 +48,6 @@ def create( if position_module: result, position_emulator = PositionEmulator.create(drone) - position_emulator.inject_position() # Inject initial position if not result: return False, None @@ -81,26 +81,75 @@ def __init__( self.position_emulator = position_emulator self.camera_emulator = camera_emulator - def set_inject_position(self) -> None: + self._stop_event: Event | None = None + self._threads: list[Thread] = [] + + def start(self) -> None: + """ + Start HITL module threads. """ - Set the position to inject into the drone. - Print out a message if position emulator is not enabled. + if self._stop_event is not None: + return + + self._stop_event = Event() + self._threads = [] + + if self.position_emulator is not None: + t = Thread(target=self.run_position, name="HITL-Position", daemon=True) + self._threads.append(t) + t.start() + if self.camera_emulator is not None: + t = Thread(target=self.run_camera, name="HITL-Camera", daemon=True) + self._threads.append(t) + t.start() + + def shutdown(self, join_timeout: float | None = 5.0) -> None: + """ + Signal threads to stop and join them. """ - if self.position_emulator is None: - print("Position emulator is not enabled.") + if self._stop_event is None: return - # pylint: disable=protected-access - position_target = self.drone._master.recv_match(...) - # pylint: enable=protected-access - if position_target: - latitude = position_target.lat_int / 1e7 - longitude = position_target.lon_int / 1e7 - altitude = position_target.alt - - self.position_emulator.inject_position(latitude, longitude, altitude) - print(f"Injected position: lat={latitude}, lon={longitude}, alt={altitude}") - else: - print("No POSITION_TARGET_GLOBAL_INT message received.") - - time.sleep(3) + + self._stop_event.set() + + for t in self._threads: + if t.is_alive(): + t.join(timeout=join_timeout) + + self._threads.clear() + self._stop_event = None + + def __del__(self) -> None: + """ + Best-effort cleanup when HITL object is destroyed. + Ensures threads are stopped and the drone connection is closed. + """ + try: + self.shutdown() + except Exception: # pylint: disable=broad-except + pass + + def run_position(self) -> None: + """ + Runs the position emulator periodic function in a loop. + """ + assert self._stop_event is not None + while not self._stop_event.is_set(): + try: + self.position_emulator.periodic() + except Exception as exc: # pylint: disable=broad-except + print(f"HITL position thread error: {exc}") + time.sleep(0.1) + + def run_camera(self) -> None: + """ + Runs the camera emulator periodic function in a loop. + """ + assert self._stop_event is not None + while not self._stop_event.is_set(): + try: + self.camera_emulator.periodic() + except Exception as exc: # pylint: disable=broad-except + print(f"HITL camera thread error: {exc}") + time.sleep(0.1) diff --git a/modules/hitl/position_emulator.py b/modules/hitl/position_emulator.py index 50c5ac7..eb3c3cb 100644 --- a/modules/hitl/position_emulator.py +++ b/modules/hitl/position_emulator.py @@ -32,8 +32,61 @@ def __init__(self, class_private_create_key: object, drone: dronekit.Vehicle) -> """ assert class_private_create_key is PositionEmulator.__create_key, "Use create() method" + self.target_position = (43.43405014107003, -80.57898027451816, 373.0) # lat, lon, alt + self.drone = drone + def set_target_position(self, latitude: float, longitude: float, altitude: float) -> None: + """ + Sets the target position manually (currently a fallback if Ardupilot target doesnt work). + + Args: + latitude: Latitude in degrees. + longitude: Longitude in degrees. + altitude: Altitude in meters. + """ + self.target_position = (latitude, longitude, altitude) + + def get_target_position(self) -> tuple[float, float, float]: + """ + Gets the target position from the Ardupilot target. + + Returns: + Target position as (latitude, longitude, altitude). + """ + # pylint: disable=protected-access + position_target = None + try: + position_target = self.drone._master.recv_match( + type="POSITION_TARGET_GLOBAL_INT", blocking=False + ) + except Exception as exc: # pylint: disable=broad-except + print(f"HITL get_target_position recv_match error: {exc}") + position_target = None + # pylint: enable=protected-access + + if position_target: + latitude = position_target.lat_int / 1e7 + longitude = position_target.lon_int / 1e7 + altitude = position_target.alt + return (latitude, longitude, altitude) + + # Optionally log if no message received + # print("No POSITION_TARGET_GLOBAL_INT message received.") + + return self.target_position + + def periodic(self) -> None: + """ + Periodic function. + """ + + self.target_position = self.get_target_position() + + self.inject_position( + self.target_position[0], self.target_position[1], self.target_position[2] + ) + def inject_position( self, latitude: float = 43.43405014107003, @@ -68,7 +121,6 @@ def inject_position( 10, # satellites_visible 0, # yaw (deg*100) ] - print("Packing values:", values) gps_input_msg = self.drone.message_factory.gps_input_encode(*values) self.drone.send_mavlink(gps_input_msg) self.drone.flush() diff --git a/modules/mavlink/flight_controller.py b/modules/mavlink/flight_controller.py index 9c23490..7d46d70 100644 --- a/modules/mavlink/flight_controller.py +++ b/modules/mavlink/flight_controller.py @@ -229,7 +229,10 @@ def create( success, hitl_instance = hitl_base.HITL.create( drone, hitl_enabled, position_module, camera_module, images_path ) - if not success: + if success: + if hitl_enabled and hitl_instance is not None: + hitl_instance.start() + else: print("Error creating HITL module") except dronekit.TimeoutError: diff --git a/tests/integration/hitl/test_threading.py b/tests/integration/hitl/test_threading.py new file mode 100644 index 0000000..1a17542 --- /dev/null +++ b/tests/integration/hitl/test_threading.py @@ -0,0 +1,37 @@ +""" +Tests the threading behavior of the HITL module (gps and camera modules). +""" + +# Physical connection to Pixhawk: /dev/ttyAMA0 +# Simulated connection to Pixhawk: tcp:localhost:5762 +PIXHAWK_ADDRESS = "/dev/ttyAMA0" + +import os +import time +from modules.mavlink import flight_controller + + +def main() -> int: + """ + Main function. + """ + images_folder_path = os.path.join("tests", "integration", "camera_emulator", "images") + + result, controller = flight_controller.FlightController.create( + PIXHAWK_ADDRESS, 57600, True, True, True, images_folder_path + ) + if not result: + print("Failed to create flight controller") + return -1 + + time.sleep(10) + + return 0 + + +if __name__ == "__main__": + result_main = main() + if result_main != 0: + print(f"ERROR: Status code: {result_main}") + + print("Done!")