diff --git a/config.yaml b/config.yaml index c10d0c4e..ce511130 100644 --- a/config.yaml +++ b/config.yaml @@ -3,25 +3,40 @@ queue_max_size: 10 video_input: - camera_name: 0 worker_period: 1.0 # seconds - save_prefix: "log_image" + camera_enum: 0 # Enum values can be found in camera_factory.py + width: 1920 + height: 1200 + # For camera_enum=0, use the OpenCV camera config. For camera_enum=1, use the PiCamera2 config + # OpenCV camera config (regular cameras, enum 0) + camera_config: + device_index: 0 + # PiCamera2 camera config (PiCamera NoIR, enum 1) + # camera_config: + # exposure_time: 250 # microseconds + # analogue_gain: 64.0 # Sets ISO, 1.0 for normal, 64.0 for max, 0.0 for min + # contrast: 1.0 # Contrast, 1.0 for nomral, 32.0 for max, 0.0 for min + # lens_position: null # Focal length, 1/m (0 for infinity, null for auto focus) + log_images: true # Set to true to save images + image_name: "log_image" # Image name when saving images detect_target: worker_count: 1 option: 0 # 0 is for Ultralytics (from detect_target_factory.py) device: 0 - model_path: "tests/model_example/yolov8s_ultralytics_pretrained_default.pt" # TODO: update + model_path: "tests/model_example/yolov8s_ultralytics_pretrained_default.pt" # See autonomy OneDrive for latest model save_prefix: "log_comp" flight_interface: - address: "tcp:127.0.0.1:14550" - timeout: 10.0 # seconds + # Port 5762 connects directly to the simulated auto pilot, which is more realistic + # than connecting to port 14550, which is the ground station + address: "tcp:localhost:5762" + timeout: 30.0 # seconds baud_rate: 57600 # symbol rate worker_period: 0.1 # seconds data_merge: - timeout: 10.0 # seconds + timeout: 30.0 # seconds geolocation: resolution_x: 1920 diff --git a/main_2024.py b/main_2024.py index 9c59908d..7ce84c27 100644 --- a/main_2024.py +++ b/main_2024.py @@ -12,6 +12,9 @@ # Used in type annotation of flight interface output # pylint: disable-next=unused-import from modules import odometry_and_time +from modules.common.modules.camera import camera_factory +from modules.common.modules.camera import camera_opencv +from modules.common.modules.camera import camera_picamera2 from modules.communications import communications_worker from modules.detect_target import detect_target_factory from modules.detect_target import detect_target_worker @@ -81,19 +84,37 @@ def main() -> int: # pylint: disable=invalid-name QUEUE_MAX_SIZE = config["queue_max_size"] - VIDEO_INPUT_CAMERA_NAME = config["video_input"]["camera_name"] VIDEO_INPUT_WORKER_PERIOD = config["video_input"]["worker_period"] - VIDEO_INPUT_SAVE_NAME_PREFIX = config["video_input"]["save_prefix"] - VIDEO_INPUT_SAVE_PREFIX = str(pathlib.Path(logging_path, VIDEO_INPUT_SAVE_NAME_PREFIX)) + VIDEO_INPUT_OPTION = camera_factory.CameraOption(config["video_input"]["camera_enum"]) + VIDEO_INPUT_WIDTH = config["video_input"]["width"] + VIDEO_INPUT_HEIGHT = config["video_input"]["height"] + match VIDEO_INPUT_OPTION: + case camera_factory.CameraOption.OPENCV: + VIDEO_INPUT_CAMERA_CONFIG = camera_opencv.ConfigOpenCV( + **config["video_input"]["camera_config"] + ) + case camera_factory.CameraOption.PICAM2: + VIDEO_INPUT_CAMERA_CONFIG = camera_picamera2.ConfigPiCamera2( + **config["video_input"]["camera_config"] + ) + case _: + main_logger.error(f"Inputted an invalid camera option: {VIDEO_INPUT_OPTION}", True) + return -1 + + VIDEO_INPUT_IMAGE_NAME = ( + config["video_input"]["image_name"] if config["video_input"]["log_images"] else None + ) DETECT_TARGET_WORKER_COUNT = config["detect_target"]["worker_count"] - DETECT_TARGET_OPTION_INT = config["detect_target"]["option"] - DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(DETECT_TARGET_OPTION_INT) + DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption( + config["detect_target"]["option"] + ) DETECT_TARGET_DEVICE = "cpu" if args.cpu else config["detect_target"]["device"] DETECT_TARGET_MODEL_PATH = config["detect_target"]["model_path"] DETECT_TARGET_OVERRIDE_FULL_PRECISION = args.full - DETECT_TARGET_SAVE_NAME_PREFIX = config["detect_target"]["save_prefix"] - DETECT_TARGET_SAVE_PREFIX = str(pathlib.Path(logging_path, DETECT_TARGET_SAVE_NAME_PREFIX)) + DETECT_TARGET_SAVE_PREFIX = str( + pathlib.Path(logging_path, config["detect_target"]["save_prefix"]) + ) DETECT_TARGET_SHOW_ANNOTATED = args.show_annotated FLIGHT_INTERFACE_ADDRESS = config["flight_interface"]["address"] @@ -125,7 +146,7 @@ def main() -> int: main_logger.error(f"Config key(s) not found: {exception}", True) return -1 except ValueError as exception: - main_logger.error(f"Could not convert detect target option into enum: {exception}", True) + main_logger.error(f"{exception}", True) return -1 # Setup @@ -199,9 +220,12 @@ def main() -> int: count=1, target=video_input_worker.video_input_worker, work_arguments=( - VIDEO_INPUT_CAMERA_NAME, + VIDEO_INPUT_OPTION, + VIDEO_INPUT_WIDTH, + VIDEO_INPUT_HEIGHT, + VIDEO_INPUT_CAMERA_CONFIG, + VIDEO_INPUT_IMAGE_NAME, VIDEO_INPUT_WORKER_PERIOD, - VIDEO_INPUT_SAVE_PREFIX, ), input_queues=[], output_queues=[video_input_to_detect_target_queue], diff --git a/modules/detect_target/detect_target_brightspot.py b/modules/detect_target/detect_target_brightspot.py new file mode 100644 index 00000000..b868ae24 --- /dev/null +++ b/modules/detect_target/detect_target_brightspot.py @@ -0,0 +1,153 @@ +""" +Detects bright spots in images. +""" + +import time + +import cv2 +import numpy as np + +from . import base_detect_target +from .. import detections_and_time +from .. import image_and_time +from ..common.modules.logger import logger + + +BRIGHTSPOT_PERCENTILE = 99.9 + +# Label for brightspots; is 1 since 0 is used for blue landing pads +DETECTION_LABEL = 1 +# SimpleBlobDetector is a binary detector, so a detection has confidence 1.0 by default +CONFIDENCE = 1.0 + + +class DetectTargetBrightspot(base_detect_target.BaseDetectTarget): + """ + Detects bright spots in images. + """ + + def __init__( + self, + local_logger: logger.Logger, + show_annotations: bool = False, + save_name: str = "", + ) -> None: + """ + Initializes the bright spot detector. + + show_annotations: Display annotated images. + save_name: Filename prefix for logging detections and annotated images. + """ + self.__counter = 0 + self.__local_logger = local_logger + self.__show_annotations = show_annotations + self.__filename_prefix = "" + if save_name != "": + self.__filename_prefix = f"{save_name}_{int(time.time())}_" + + def run( + self, data: image_and_time.ImageAndTime + ) -> tuple[True, detections_and_time.DetectionsAndTime] | tuple[False, None]: + """ + Runs brightspot detection on the provided image and returns the detections. + + data: Image with a timestamp. + + Return: Success, detections. + """ + start_time = time.time() + + image = data.image + try: + grey_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + # Catching all exceptions for library call + # pylint: disable-next=broad-exception-caught + except Exception as exception: + self.__local_logger.error( + f"{time.time()}: Failed to convert to greyscale, exception: {exception}" + ) + return False, None + + brightspot_threshold = np.percentile(grey_image, BRIGHTSPOT_PERCENTILE) + + # Apply thresholding to isolate bright spots + threshold_used, bw_image = cv2.threshold( + grey_image, brightspot_threshold, 255, cv2.THRESH_BINARY + ) + if threshold_used == 0: + self.__local_logger.error(f"{time.time()}: Failed to threshold image.") + return False, None + + # Set up SimpleBlobDetector + params = cv2.SimpleBlobDetector_Params() + params.filterByColor = True + params.blobColor = 255 + params.filterByCircularity = False + params.filterByInertia = True + params.minInertiaRatio = 0.2 + params.filterByConvexity = False + params.filterByArea = True + params.minArea = 50 # pixels + + detector = cv2.SimpleBlobDetector_create(params) + keypoints = detector.detect(bw_image) + + # A lack of detections is not an error, but should still not be forwarded + if len(keypoints) == 0: + self.__local_logger.info(f"{time.time()}: No brightspots detected.") + return False, None + + # Annotate the image (green circle) with detected keypoints + image_annotated = cv2.drawKeypoints( + image, keypoints, None, (0, 255, 0), cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS + ) + + # Process bright spot detection + result, detections = detections_and_time.DetectionsAndTime.create(data.timestamp) + if not result: + self.__local_logger.error(f"{time.time()}: Failed to create detections for image.") + return False, None + + # Get Pylance to stop complaining + assert detections is not None + + # Draw bounding boxes around detected keypoints + for keypoint in keypoints: + x, y = keypoint.pt + size = keypoint.size + bounds = np.array([x - size / 2, y - size / 2, x + size / 2, y + size / 2]) + result, detection = detections_and_time.Detection.create( + bounds, DETECTION_LABEL, CONFIDENCE + ) + if not result: + self.__local_logger.error(f"{time.time()}: Failed to create bounding boxes.") + return False, None + + # Get Pylance to stop complaining + assert detections is not None + + detections.append(detection) + + # Logging is identical to detect_target_ultralytics.py + # pylint: disable=duplicate-code + end_time = time.time() + + # Logging + self.__local_logger.info( + f"{time.time()}: Count: {self.__counter}. Target detection took {end_time - start_time} seconds. Objects detected: {detections}." + ) + + if self.__filename_prefix != "": + filename = self.__filename_prefix + str(self.__counter) + + # Annotated image + cv2.imwrite(filename + ".png", image_annotated) # type: ignore + + self.__counter += 1 + + if self.__show_annotations: + cv2.imshow("Annotated", image_annotated) # type: ignore + + # pylint: enable=duplicate-code + + return True, detections diff --git a/modules/detect_target/detect_target_factory.py b/modules/detect_target/detect_target_factory.py index ca37a14b..013cba3e 100644 --- a/modules/detect_target/detect_target_factory.py +++ b/modules/detect_target/detect_target_factory.py @@ -3,8 +3,10 @@ """ import enum +import torch from . import base_detect_target +from . import detect_target_brightspot from . import detect_target_ultralytics from ..common.modules.logger import logger @@ -15,6 +17,7 @@ class DetectTargetOption(enum.Enum): """ ML_ULTRALYTICS = 0 + CV_BRIGHTSPOT = 1 def create_detect_target( @@ -27,8 +30,16 @@ def create_detect_target( save_name: str, ) -> tuple[bool, base_detect_target.BaseDetectTarget | None]: """ - Construct detect target class at runtime. + Factory function to create a detection target object. + + Return: + Success, detect target object. """ + # Fall back to CPU if no GPU is available + if device != "cpu" and not torch.cuda.is_available(): + local_logger.warning("CUDA not available. Falling back to CPU.") + device = "cpu" + match detect_target_option: case DetectTargetOption.ML_ULTRALYTICS: return True, detect_target_ultralytics.DetectTargetUltralytics( @@ -39,5 +50,11 @@ def create_detect_target( show_annotations, save_name, ) + case DetectTargetOption.CV_BRIGHTSPOT: + return True, detect_target_brightspot.DetectTargetBrightspot( + local_logger, + show_annotations, + save_name, + ) return False, None diff --git a/modules/detect_target/detect_target_ultralytics.py b/modules/detect_target/detect_target_ultralytics.py index 4f9bddc1..ef3bc725 100644 --- a/modules/detect_target/detect_target_ultralytics.py +++ b/modules/detect_target/detect_target_ultralytics.py @@ -5,8 +5,10 @@ import time import cv2 +import torch import ultralytics + from . import base_detect_target from .. import image_and_time from .. import detections_and_time @@ -37,7 +39,7 @@ def __init__( self.__device = device self.__model = ultralytics.YOLO(model_path) self.__counter = 0 - self.__enable_half_precision = not self.__device == "cpu" + self.__enable_half_precision = self.__device != "cpu" self.__local_logger = local_logger self.__show_annotations = show_annotations if override_full: @@ -45,6 +47,10 @@ def __init__( self.__filename_prefix = "" if save_name != "": self.__filename_prefix = save_name + "_" + str(int(time.time())) + "_" + + if self.__device != "cpu" and not torch.cuda.is_available(): + self.__local_logger.warning("CUDA not available. Falling back to CPU.") + self.__device = "cpu" def run( self, data: image_and_time.ImageAndTime @@ -111,6 +117,13 @@ def run( self.__counter += 1 if self.__show_annotations: - cv2.imshow("Annotated", image_annotated) # type: ignore + if image_annotated is None: + self.__local_logger.error("Annotated image is invalid.") + return False, detections + + + # Display the annotated image in a named window + cv2.imshow("Annotated", image_annotated) + cv2.waitKey(1) # Short delay to process GUI events return True, detections diff --git a/modules/video_input/video_input.py b/modules/video_input/video_input.py index faaef9ac..0a62415a 100644 --- a/modules/video_input/video_input.py +++ b/modules/video_input/video_input.py @@ -3,7 +3,11 @@ """ from .. import image_and_time -from ..common.modules.camera import camera_device +from ..common.modules.camera import base_camera +from ..common.modules.camera import camera_factory +from ..common.modules.camera import camera_opencv +from ..common.modules.camera import camera_picamera2 +from ..common.modules.logger import logger class VideoInput: @@ -11,15 +15,60 @@ class VideoInput: Combines image and timestamp together. """ - def __init__(self, camera_name: "int | str", save_name: str = "") -> None: - self.device = camera_device.CameraDevice(camera_name, 1, save_name) + __create_key = object() - def run(self) -> "tuple[bool, image_and_time.ImageAndTime | None]": + @classmethod + def create( + cls, + camera_option: camera_factory.CameraOption, + width: int, + height: int, + config: camera_opencv.ConfigOpenCV | camera_picamera2.ConfigPiCamera2, + maybe_image_name: str | None, + local_logger: logger.Logger, + ) -> "tuple[True, VideoInput] | tuple[False, None]": + """ + camera_option specifies which camera driver to use. + width is the width of the images the camera takes in pixels. + height is the height of the images the camera takes in pixels. + camera_config specifies camera settings. + maybe_image_name is the filename to save the image as. Set to None to not log any images. + """ + result, camera = camera_factory.create_camera(camera_option, width, height, config) + if not result: + local_logger.error( + f"Camera factory failed. Current configs were: {camera_option=}, {width=}, {height=}, {config=}." + ) + return False, None + + return True, VideoInput(cls.__create_key, camera, maybe_image_name, local_logger) + + def __init__( + self, + class_private_create_key: object, + camera: base_camera.BaseCameraDevice, + maybe_image_name: str | None, + local_logger: logger.Logger, + ) -> None: + """ + Private constructor, use create() method. + """ + assert class_private_create_key is VideoInput.__create_key, "Use create() method." + + self.__device = camera + self.__maybe_image_name = maybe_image_name + self.__logger = local_logger + + def run(self) -> "tuple[True, image_and_time.ImageAndTime] | tuple[False, None]": """ Returns a possible ImageAndTime with current timestamp. """ - result, image = self.device.get_image() + result, image = self.__device.run() if not result: + self.__logger.warning("Failed to take image") return False, None + if self.__maybe_image_name is not None: + self.__logger.save_image(image, self.__maybe_image_name) + return image_and_time.ImageAndTime.create(image) diff --git a/modules/video_input/video_input_worker.py b/modules/video_input/video_input_worker.py index aa7ea171..a3b8cf97 100644 --- a/modules/video_input/video_input_worker.py +++ b/modules/video_input/video_input_worker.py @@ -2,30 +2,56 @@ Gets images from the camera. """ +import os +import pathlib import time from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from . import video_input +from ..common.modules.camera import camera_factory +from ..common.modules.camera import camera_opencv +from ..common.modules.camera import camera_picamera2 +from ..common.modules.logger import logger def video_input_worker( - camera_name: "int | str", + camera_option: camera_factory.CameraOption, + width: int, + height: int, + camera_config: camera_opencv.ConfigOpenCV | camera_picamera2.ConfigPiCamera2, + maybe_image_name: str | None, period: float, - save_name: str, output_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ Worker process. - camera_name is initial setting. - period is minimum period between loops. - save_name is path for logging. + period is the minimum period between image captures in seconds. output_queue is the data queue. controller is how the main process communicates to this worker process. """ - input_device = video_input.VideoInput(camera_name, save_name) + worker_name = pathlib.Path(__file__).stem + process_id = os.getpid() + result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True) + if not result: + print("ERROR: Worker failed to create logger") + return + + assert local_logger is not None + + local_logger.info("Logger initialized") + + result, input_device = video_input.VideoInput.create( + camera_option, width, height, camera_config, maybe_image_name, local_logger + ) + if not result: + local_logger.error("Worker failed to create class object") + return + + # Get Pylance to stop complaining + assert input_device is not None while not controller.is_exit_requested(): controller.check_pause() diff --git a/tests/brightspot_example/bounding_box_ir_detections_0.txt b/tests/brightspot_example/bounding_box_ir_detections_0.txt new file mode 100644 index 00000000..425d0eb9 --- /dev/null +++ b/tests/brightspot_example/bounding_box_ir_detections_0.txt @@ -0,0 +1 @@ +1.000000 1.000000 545.888705 202.055392 555.831266 211.997953 diff --git a/tests/brightspot_example/bounding_box_ir_detections_1.txt b/tests/brightspot_example/bounding_box_ir_detections_1.txt new file mode 100644 index 00000000..8e5624b7 --- /dev/null +++ b/tests/brightspot_example/bounding_box_ir_detections_1.txt @@ -0,0 +1 @@ +1.000000 1.000000 443.409194 379.341292 453.529099 389.461198 diff --git a/tests/brightspot_example/bounding_box_ir_detections_2.txt b/tests/brightspot_example/bounding_box_ir_detections_2.txt new file mode 100644 index 00000000..d2775559 --- /dev/null +++ b/tests/brightspot_example/bounding_box_ir_detections_2.txt @@ -0,0 +1 @@ +1.000000 1.000000 270.872054 249.021590 288.934281 267.083817 diff --git a/tests/brightspot_example/bounding_box_ir_detections_3.txt b/tests/brightspot_example/bounding_box_ir_detections_3.txt new file mode 100644 index 00000000..a0d6d546 --- /dev/null +++ b/tests/brightspot_example/bounding_box_ir_detections_3.txt @@ -0,0 +1 @@ +1.000000 1.000000 630.931005 406.213048 640.793971 416.076014 diff --git a/tests/brightspot_example/bounding_box_ir_detections_4.txt b/tests/brightspot_example/bounding_box_ir_detections_4.txt new file mode 100644 index 00000000..7c4e5e8a --- /dev/null +++ b/tests/brightspot_example/bounding_box_ir_detections_4.txt @@ -0,0 +1 @@ +1.000000 1.000000 407.681973 162.778408 426.180088 181.276524 diff --git a/tests/brightspot_example/generate_expected.py b/tests/brightspot_example/generate_expected.py new file mode 100644 index 00000000..c7af3096 --- /dev/null +++ b/tests/brightspot_example/generate_expected.py @@ -0,0 +1,111 @@ +""" +Generates expected output for the brightspot detector. +""" + +import pathlib + +import cv2 +import numpy as np + +from modules import image_and_time +from modules.common.modules.logger import logger +from modules.detect_target import detect_target_brightspot + + +TEST_PATH = pathlib.Path("tests", "brightspot_example") + +NUMBER_OF_IMAGES_DETECTIONS = 5 +IMAGE_DETECTIONS_FILES = [ + pathlib.Path(f"ir_detections_{i}.png") for i in range(0, NUMBER_OF_IMAGES_DETECTIONS) +] +ANNOTATED_IMAGE_PATHS = [ + pathlib.Path(TEST_PATH, f"ir_detections_{i}_annotated.png") + for i in range(0, NUMBER_OF_IMAGES_DETECTIONS) +] +EXPECTED_DETECTIONS_PATHS = [ + pathlib.Path(TEST_PATH, f"bounding_box_ir_detections_{i}.txt") + for i in range(0, NUMBER_OF_IMAGES_DETECTIONS) +] + +NUMBER_OF_IMAGES_NO_DETECTIONS = 2 +IMAGE_NO_DETECTIONS_FILES = [ + pathlib.Path(f"ir_no_detections_{i}.png") for i in range(0, NUMBER_OF_IMAGES_NO_DETECTIONS) +] + + +def main() -> int: + """ + Main function. + """ + result, temp_logger = logger.Logger.create("test_logger", False) + if not temp_logger: + print("ERROR: Failed to create logger.") + return 1 + + detector = detect_target_brightspot.DetectTargetBrightspot( + local_logger=temp_logger, show_annotations=False, save_name="" + ) + + for image_file, annotated_image_path, expected_detections_path in zip( + IMAGE_DETECTIONS_FILES, ANNOTATED_IMAGE_PATHS, EXPECTED_DETECTIONS_PATHS + ): + image_path = pathlib.Path(TEST_PATH, image_file) + image = cv2.imread(str(image_path)) # type: ignore + result, image_data = image_and_time.ImageAndTime.create(image) + if not result: + temp_logger.error(f"Failed to load image {image_path}.") + continue + + # Get Pylance to stop complaining + assert image_data is not None + + result, detections = detector.run(image_data) + if not result: + temp_logger.error(f"Unable to get detections for {image_path}.") + continue + + # Get Pylance to stop complaining + assert detections is not None + + detections_list = [] + image_annotated = image.copy() + for detection in detections.detections: + confidence = detection.confidence + label = detection.label + x_1 = detection.x_1 + y_1 = detection.y_1 + x_2 = detection.x_2 + y_2 = detection.y_2 + detections_list.append([confidence, label, x_1, y_1, x_2, y_2]) + + cv2.rectangle(image_annotated, (int(x_1), int(y_1)), (int(x_2), int(y_2)), (0, 255, 0), 1) # type: ignore + + detections_array = np.array(detections_list) + + np.savetxt(expected_detections_path, detections_array, fmt="%.6f") + temp_logger.info(f"Expected detections saved to {expected_detections_path}.") + + result = cv2.imwrite(str(annotated_image_path), image_annotated) # type: ignore + if not result: + temp_logger.error(f"Failed to write image to {annotated_image_path}.") + continue + + temp_logger.info(f"Annotated image saved to {annotated_image_path}.") + + for image_file in IMAGE_NO_DETECTIONS_FILES: + result, detections = detector.run(image_data) + if result: + temp_logger.error(f"False positive detections in {image_path}.") + continue + + assert detections is None + + return 0 + + +if __name__ == "__main__": + result_main = main() + if result_main < 0: + print(f"ERROR: Status code: {result_main}") + else: + print("Done!") diff --git a/tests/brightspot_example/ir_detections_0.png b/tests/brightspot_example/ir_detections_0.png new file mode 100644 index 00000000..19fe943c Binary files /dev/null and b/tests/brightspot_example/ir_detections_0.png differ diff --git a/tests/brightspot_example/ir_detections_0_annotated.png b/tests/brightspot_example/ir_detections_0_annotated.png new file mode 100644 index 00000000..8a61279e Binary files /dev/null and b/tests/brightspot_example/ir_detections_0_annotated.png differ diff --git a/tests/brightspot_example/ir_detections_1.png b/tests/brightspot_example/ir_detections_1.png new file mode 100644 index 00000000..8e53a690 Binary files /dev/null and b/tests/brightspot_example/ir_detections_1.png differ diff --git a/tests/brightspot_example/ir_detections_1_annotated.png b/tests/brightspot_example/ir_detections_1_annotated.png new file mode 100644 index 00000000..912983e9 Binary files /dev/null and b/tests/brightspot_example/ir_detections_1_annotated.png differ diff --git a/tests/brightspot_example/ir_detections_2.png b/tests/brightspot_example/ir_detections_2.png new file mode 100644 index 00000000..152ff343 Binary files /dev/null and b/tests/brightspot_example/ir_detections_2.png differ diff --git a/tests/brightspot_example/ir_detections_2_annotated.png b/tests/brightspot_example/ir_detections_2_annotated.png new file mode 100644 index 00000000..f8399125 Binary files /dev/null and b/tests/brightspot_example/ir_detections_2_annotated.png differ diff --git a/tests/brightspot_example/ir_detections_3.png b/tests/brightspot_example/ir_detections_3.png new file mode 100644 index 00000000..fcafb259 Binary files /dev/null and b/tests/brightspot_example/ir_detections_3.png differ diff --git a/tests/brightspot_example/ir_detections_3_annotated.png b/tests/brightspot_example/ir_detections_3_annotated.png new file mode 100644 index 00000000..0dc474a4 Binary files /dev/null and b/tests/brightspot_example/ir_detections_3_annotated.png differ diff --git a/tests/brightspot_example/ir_detections_4.png b/tests/brightspot_example/ir_detections_4.png new file mode 100644 index 00000000..24c89374 Binary files /dev/null and b/tests/brightspot_example/ir_detections_4.png differ diff --git a/tests/brightspot_example/ir_detections_4_annotated.png b/tests/brightspot_example/ir_detections_4_annotated.png new file mode 100644 index 00000000..2299c384 Binary files /dev/null and b/tests/brightspot_example/ir_detections_4_annotated.png differ diff --git a/tests/brightspot_example/ir_no_detections_0.png b/tests/brightspot_example/ir_no_detections_0.png new file mode 100644 index 00000000..546c1809 Binary files /dev/null and b/tests/brightspot_example/ir_no_detections_0.png differ diff --git a/tests/brightspot_example/ir_no_detections_1.png b/tests/brightspot_example/ir_no_detections_1.png new file mode 100644 index 00000000..f9d7ea35 Binary files /dev/null and b/tests/brightspot_example/ir_no_detections_1.png differ diff --git a/tests/integration/test_video_input_hardware.py b/tests/integration/test_video_input_hardware.py index 2557e53d..b533fb00 100644 --- a/tests/integration/test_video_input_hardware.py +++ b/tests/integration/test_video_input_hardware.py @@ -2,21 +2,38 @@ Simple hardware test, requires camera. """ +import pathlib + +from modules.common.modules.camera import camera_factory +from modules.common.modules.camera import camera_opencv +from modules.common.modules.logger import logger from modules.video_input import video_input -CAMERA = 0 +# Modify as needed +CAMERA = camera_factory.CameraOption.OPENCV +WIDTH = 640 +HEIGHT = 480 +CONFIG = camera_opencv.ConfigOpenCV(0) +IMAGE_NAME = None # Not saving any pictures def main() -> int: """ Main function. """ + # Logger + test_name = pathlib.Path(__file__).stem + result, local_logger = logger.Logger.create(test_name, False) + assert result + assert local_logger is not None + # Setup - # TODO: Common change logging option - camera = video_input.VideoInput( - CAMERA, + result, camera = video_input.VideoInput.create( + CAMERA, WIDTH, HEIGHT, CONFIG, IMAGE_NAME, local_logger ) + assert result + assert camera is not None # Run result, image = camera.run() diff --git a/tests/integration/test_video_input_worker.py b/tests/integration/test_video_input_worker.py index 5de29148..a3b2efc0 100644 --- a/tests/integration/test_video_input_worker.py +++ b/tests/integration/test_video_input_worker.py @@ -6,14 +6,21 @@ import queue import time +from modules.common.modules.camera import camera_factory +from modules.common.modules.camera import camera_opencv from modules.video_input import video_input_worker from modules import image_and_time from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller +# Modify these settings as needed VIDEO_INPUT_WORKER_PERIOD = 1.0 -CAMERA = 0 +CAMERA = camera_factory.CameraOption.OPENCV +WIDTH = 640 +HEIGHT = 480 +CONFIG = camera_opencv.ConfigOpenCV(0) +IMAGE_NAME = None # Not saving any pictures def main() -> int: @@ -29,7 +36,16 @@ def main() -> int: worker = mp.Process( target=video_input_worker.video_input_worker, - args=(CAMERA, VIDEO_INPUT_WORKER_PERIOD, "", out_queue, controller), + args=( + CAMERA, + WIDTH, + HEIGHT, + CONFIG, + IMAGE_NAME, + VIDEO_INPUT_WORKER_PERIOD, + out_queue, + controller, + ), ) # Run diff --git a/tests/unit/test_detect_target_brightspot.py b/tests/unit/test_detect_target_brightspot.py new file mode 100644 index 00000000..a8f1a3c6 --- /dev/null +++ b/tests/unit/test_detect_target_brightspot.py @@ -0,0 +1,204 @@ +""" +Test DetectTargetBrightspot module. +""" + +import pathlib + +import cv2 +import numpy as np +import pytest + +from modules import detections_and_time +from modules import image_and_time +from modules.common.modules.logger import logger +from modules.detect_target import detect_target_brightspot + + +TEST_PATH = pathlib.Path("tests", "brightspot_example") + +NUMBER_OF_IMAGES_DETECTIONS = 5 +IMAGE_DETECTIONS_FILES = [ + pathlib.Path(f"ir_detections_{i}.png") for i in range(0, NUMBER_OF_IMAGES_DETECTIONS) +] +EXPECTED_DETECTIONS_PATHS = [ + pathlib.Path(TEST_PATH, f"bounding_box_ir_detections_{i}.txt") + for i in range(0, NUMBER_OF_IMAGES_DETECTIONS) +] +DETECTION_TEST_CASES = list(zip(IMAGE_DETECTIONS_FILES, EXPECTED_DETECTIONS_PATHS)) + +NUMBER_OF_IMAGES_NO_DETECTIONS = 2 +IMAGE_NO_DETECTIONS_FILES = [ + pathlib.Path(f"ir_no_detections_{i}.png") for i in range(0, NUMBER_OF_IMAGES_NO_DETECTIONS) +] +NO_DETECTION_TEST_CASES = IMAGE_NO_DETECTIONS_FILES + +BOUNDING_BOX_PRECISION_TOLERANCE = 3 +CONFIDENCE_PRECISION_TOLERANCE = 6 + + +# Test functions use test fixture signature names and access class privates +# No enable +# pylint: disable=protected-access,redefined-outer-name,duplicate-code + + +def compare_detections( + actual: detections_and_time.DetectionsAndTime, expected: detections_and_time.DetectionsAndTime +) -> None: + """ + Compare expected and actual detections. + """ + assert len(actual.detections) == len(expected.detections) + + for actual_detection, expected_detection in zip(actual.detections, expected.detections): + assert expected_detection.label == actual_detection.label + + np.testing.assert_almost_equal( + actual_detection.confidence, + expected_detection.confidence, + decimal=CONFIDENCE_PRECISION_TOLERANCE, + ) + + np.testing.assert_almost_equal( + actual_detection.x_1, + expected_detection.x_1, + decimal=BOUNDING_BOX_PRECISION_TOLERANCE, + ) + + np.testing.assert_almost_equal( + actual_detection.y_1, + expected_detection.y_1, + decimal=BOUNDING_BOX_PRECISION_TOLERANCE, + ) + + np.testing.assert_almost_equal( + actual_detection.x_2, + expected_detection.x_2, + decimal=BOUNDING_BOX_PRECISION_TOLERANCE, + ) + + np.testing.assert_almost_equal( + actual_detection.y_2, + expected_detection.y_2, + decimal=BOUNDING_BOX_PRECISION_TOLERANCE, + ) + + +def create_detections(detections_from_file: np.ndarray) -> detections_and_time.DetectionsAndTime: + """ + Create DetectionsAndTime from expected detections. + Format: [confidence, label, x_1, y_1, x_2, y_2]. + """ + result, detections = detections_and_time.DetectionsAndTime.create(0) + assert result + assert detections is not None + + if detections_from_file.size == 0: + return detections + + if detections_from_file.ndim == 1: + detections_from_file = detections_from_file.reshape(1, -1) + + assert detections_from_file.shape[1] == 6 + + for detection_data in detections_from_file: + confidence, label, x_1, y_1, x_2, y_2 = detection_data + bounds = np.array([x_1, y_1, x_2, y_2]) + result, detection = detections_and_time.Detection.create(bounds, int(label), confidence) + assert result + assert detection is not None + detections.append(detection) + + return detections + + +@pytest.fixture() +def detector() -> detect_target_brightspot.DetectTargetBrightspot: # type: ignore + """ + Construct DetectTargetBrightspot. + """ + result, test_logger = logger.Logger.create("test_logger", False) + + assert result + assert test_logger is not None + + detection = detect_target_brightspot.DetectTargetBrightspot(test_logger) + yield detection # type: ignore + + +@pytest.fixture(params=DETECTION_TEST_CASES) +def image_ir_detections(request: pytest.FixtureRequest) -> tuple[image_and_time.ImageAndTime, detections_and_time.DetectionsAndTime]: # type: ignore + """ + Load image and its corresponding expected detections. + """ + image_file, expected_detections_file = request.param + + image_path = pathlib.Path(TEST_PATH, image_file) + image = cv2.imread(str(image_path)) + assert image is not None + + result, ir_image = image_and_time.ImageAndTime.create(image) + assert result + assert ir_image is not None + + expected = np.loadtxt(expected_detections_file) + detections = create_detections(expected) + + yield ir_image, detections # type: ignore + + +@pytest.fixture(params=NO_DETECTION_TEST_CASES) +def image_ir_no_detections(request: pytest.FixtureRequest) -> image_and_time.ImageAndTime: # type: ignore + """ + Load image with no detections. + """ + image_file = request.param + + image_path = pathlib.Path(TEST_PATH, image_file) + image = cv2.imread(str(image_path)) + assert image is not None + + result, ir_image = image_and_time.ImageAndTime.create(image) + assert result + assert ir_image is not None + + yield ir_image # type: ignore + + +class TestBrightspotDetector: + """ + Tests `DetectTargetBrightspot.run()`. + """ + + def test_images_with_detections( + self, + detector: detect_target_brightspot.DetectTargetBrightspot, + image_ir_detections: tuple[ + image_and_time.ImageAndTime, detections_and_time.DetectionsAndTime + ], + ) -> None: + """ + Test detection on images where detections are expected. + """ + image, expected_detections = image_ir_detections + + result, actual = detector.run(image) + + assert result + assert actual is not None + + compare_detections(actual, expected_detections) + + def test_images_no_detections( + self, + detector: detect_target_brightspot.DetectTargetBrightspot, + image_ir_no_detections: image_and_time.ImageAndTime, + ) -> None: + """ + Test detection on images where no detections are expected. + """ + image = image_ir_no_detections + + result, actual = detector.run(image) + + assert result is False + assert actual is None