Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,40 @@
queue_max_size: 10

video_input:
camera_name: 0
worker_period: 1.0 # seconds
save_prefix: "log_image"
camera_enum: 0 # Enum values can be found in camera_factory.py
width: 1920
height: 1200
# For camera_enum=0, use the OpenCV camera config. For camera_enum=1, use the PiCamera2 config
# OpenCV camera config (regular cameras, enum 0)
camera_config:
device_index: 0
# PiCamera2 camera config (PiCamera NoIR, enum 1)
# camera_config:
# exposure_time: 250 # microseconds
# analogue_gain: 64.0 # Sets ISO, 1.0 for normal, 64.0 for max, 0.0 for min
# contrast: 1.0 # Contrast, 1.0 for nomral, 32.0 for max, 0.0 for min
# lens_position: null # Focal length, 1/m (0 for infinity, null for auto focus)
log_images: true # Set to true to save images
image_name: "log_image" # Image name when saving images

detect_target:
worker_count: 1
option: 0 # 0 is for Ultralytics (from detect_target_factory.py)
device: 0
model_path: "tests/model_example/yolov8s_ultralytics_pretrained_default.pt" # TODO: update
model_path: "tests/model_example/yolov8s_ultralytics_pretrained_default.pt" # See autonomy OneDrive for latest model
save_prefix: "log_comp"

flight_interface:
address: "tcp:127.0.0.1:14550"
timeout: 10.0 # seconds
# Port 5762 connects directly to the simulated auto pilot, which is more realistic
# than connecting to port 14550, which is the ground station
address: "tcp:localhost:5762"
timeout: 30.0 # seconds
baud_rate: 57600 # symbol rate
worker_period: 0.1 # seconds

data_merge:
timeout: 10.0 # seconds
timeout: 30.0 # seconds

geolocation:
resolution_x: 1920
Expand Down
44 changes: 34 additions & 10 deletions main_2024.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# Used in type annotation of flight interface output
# pylint: disable-next=unused-import
from modules import odometry_and_time
from modules.common.modules.camera import camera_factory
from modules.common.modules.camera import camera_opencv
from modules.common.modules.camera import camera_picamera2
from modules.communications import communications_worker
from modules.detect_target import detect_target_factory
from modules.detect_target import detect_target_worker
Expand Down Expand Up @@ -81,19 +84,37 @@ def main() -> int:
# pylint: disable=invalid-name
QUEUE_MAX_SIZE = config["queue_max_size"]

VIDEO_INPUT_CAMERA_NAME = config["video_input"]["camera_name"]
VIDEO_INPUT_WORKER_PERIOD = config["video_input"]["worker_period"]
VIDEO_INPUT_SAVE_NAME_PREFIX = config["video_input"]["save_prefix"]
VIDEO_INPUT_SAVE_PREFIX = str(pathlib.Path(logging_path, VIDEO_INPUT_SAVE_NAME_PREFIX))
VIDEO_INPUT_OPTION = camera_factory.CameraOption(config["video_input"]["camera_enum"])
VIDEO_INPUT_WIDTH = config["video_input"]["width"]
VIDEO_INPUT_HEIGHT = config["video_input"]["height"]
match VIDEO_INPUT_OPTION:
case camera_factory.CameraOption.OPENCV:
VIDEO_INPUT_CAMERA_CONFIG = camera_opencv.ConfigOpenCV(
**config["video_input"]["camera_config"]
)
case camera_factory.CameraOption.PICAM2:
VIDEO_INPUT_CAMERA_CONFIG = camera_picamera2.ConfigPiCamera2(
**config["video_input"]["camera_config"]
)
case _:
main_logger.error(f"Inputted an invalid camera option: {VIDEO_INPUT_OPTION}", True)
return -1

VIDEO_INPUT_IMAGE_NAME = (
config["video_input"]["image_name"] if config["video_input"]["log_images"] else None
)

DETECT_TARGET_WORKER_COUNT = config["detect_target"]["worker_count"]
DETECT_TARGET_OPTION_INT = config["detect_target"]["option"]
DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(DETECT_TARGET_OPTION_INT)
DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(
config["detect_target"]["option"]
)
DETECT_TARGET_DEVICE = "cpu" if args.cpu else config["detect_target"]["device"]
DETECT_TARGET_MODEL_PATH = config["detect_target"]["model_path"]
DETECT_TARGET_OVERRIDE_FULL_PRECISION = args.full
DETECT_TARGET_SAVE_NAME_PREFIX = config["detect_target"]["save_prefix"]
DETECT_TARGET_SAVE_PREFIX = str(pathlib.Path(logging_path, DETECT_TARGET_SAVE_NAME_PREFIX))
DETECT_TARGET_SAVE_PREFIX = str(
pathlib.Path(logging_path, config["detect_target"]["save_prefix"])
)
DETECT_TARGET_SHOW_ANNOTATED = args.show_annotated

FLIGHT_INTERFACE_ADDRESS = config["flight_interface"]["address"]
Expand Down Expand Up @@ -125,7 +146,7 @@ def main() -> int:
main_logger.error(f"Config key(s) not found: {exception}", True)
return -1
except ValueError as exception:
main_logger.error(f"Could not convert detect target option into enum: {exception}", True)
main_logger.error(f"{exception}", True)
return -1

# Setup
Expand Down Expand Up @@ -199,9 +220,12 @@ def main() -> int:
count=1,
target=video_input_worker.video_input_worker,
work_arguments=(
VIDEO_INPUT_CAMERA_NAME,
VIDEO_INPUT_OPTION,
VIDEO_INPUT_WIDTH,
VIDEO_INPUT_HEIGHT,
VIDEO_INPUT_CAMERA_CONFIG,
VIDEO_INPUT_IMAGE_NAME,
VIDEO_INPUT_WORKER_PERIOD,
VIDEO_INPUT_SAVE_PREFIX,
),
input_queues=[],
output_queues=[video_input_to_detect_target_queue],
Expand Down
153 changes: 153 additions & 0 deletions modules/detect_target/detect_target_brightspot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""
Detects bright spots in images.
"""

import time

import cv2
import numpy as np

from . import base_detect_target
from .. import detections_and_time
from .. import image_and_time
from ..common.modules.logger import logger


BRIGHTSPOT_PERCENTILE = 99.9

# Label for brightspots; is 1 since 0 is used for blue landing pads
DETECTION_LABEL = 1
# SimpleBlobDetector is a binary detector, so a detection has confidence 1.0 by default
CONFIDENCE = 1.0


class DetectTargetBrightspot(base_detect_target.BaseDetectTarget):
"""
Detects bright spots in images.
"""

def __init__(
self,
local_logger: logger.Logger,
show_annotations: bool = False,
save_name: str = "",
) -> None:
"""
Initializes the bright spot detector.

show_annotations: Display annotated images.
save_name: Filename prefix for logging detections and annotated images.
"""
self.__counter = 0
self.__local_logger = local_logger
self.__show_annotations = show_annotations
self.__filename_prefix = ""
if save_name != "":
self.__filename_prefix = f"{save_name}_{int(time.time())}_"

def run(
self, data: image_and_time.ImageAndTime
) -> tuple[True, detections_and_time.DetectionsAndTime] | tuple[False, None]:
"""
Runs brightspot detection on the provided image and returns the detections.

data: Image with a timestamp.

Return: Success, detections.
"""
start_time = time.time()

image = data.image
try:
grey_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Catching all exceptions for library call
# pylint: disable-next=broad-exception-caught
except Exception as exception:
self.__local_logger.error(
f"{time.time()}: Failed to convert to greyscale, exception: {exception}"
)
return False, None

brightspot_threshold = np.percentile(grey_image, BRIGHTSPOT_PERCENTILE)

# Apply thresholding to isolate bright spots
threshold_used, bw_image = cv2.threshold(
grey_image, brightspot_threshold, 255, cv2.THRESH_BINARY
)
if threshold_used == 0:
self.__local_logger.error(f"{time.time()}: Failed to threshold image.")
return False, None

# Set up SimpleBlobDetector
params = cv2.SimpleBlobDetector_Params()
params.filterByColor = True
params.blobColor = 255
params.filterByCircularity = False
params.filterByInertia = True
params.minInertiaRatio = 0.2
params.filterByConvexity = False
params.filterByArea = True
params.minArea = 50 # pixels

detector = cv2.SimpleBlobDetector_create(params)
keypoints = detector.detect(bw_image)

# A lack of detections is not an error, but should still not be forwarded
if len(keypoints) == 0:
self.__local_logger.info(f"{time.time()}: No brightspots detected.")
return False, None

# Annotate the image (green circle) with detected keypoints
image_annotated = cv2.drawKeypoints(
image, keypoints, None, (0, 255, 0), cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS
)

# Process bright spot detection
result, detections = detections_and_time.DetectionsAndTime.create(data.timestamp)
if not result:
self.__local_logger.error(f"{time.time()}: Failed to create detections for image.")
return False, None

# Get Pylance to stop complaining
assert detections is not None

# Draw bounding boxes around detected keypoints
for keypoint in keypoints:
x, y = keypoint.pt
size = keypoint.size
bounds = np.array([x - size / 2, y - size / 2, x + size / 2, y + size / 2])
result, detection = detections_and_time.Detection.create(
bounds, DETECTION_LABEL, CONFIDENCE
)
if not result:
self.__local_logger.error(f"{time.time()}: Failed to create bounding boxes.")
return False, None

# Get Pylance to stop complaining
assert detections is not None

detections.append(detection)

# Logging is identical to detect_target_ultralytics.py
# pylint: disable=duplicate-code
end_time = time.time()

# Logging
self.__local_logger.info(
f"{time.time()}: Count: {self.__counter}. Target detection took {end_time - start_time} seconds. Objects detected: {detections}."
)

if self.__filename_prefix != "":
filename = self.__filename_prefix + str(self.__counter)

# Annotated image
cv2.imwrite(filename + ".png", image_annotated) # type: ignore

self.__counter += 1

if self.__show_annotations:
cv2.imshow("Annotated", image_annotated) # type: ignore

# pylint: enable=duplicate-code

return True, detections
19 changes: 18 additions & 1 deletion modules/detect_target/detect_target_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
"""

import enum
import torch

from . import base_detect_target
from . import detect_target_brightspot
from . import detect_target_ultralytics
from ..common.modules.logger import logger

Expand All @@ -15,6 +17,7 @@ class DetectTargetOption(enum.Enum):
"""

ML_ULTRALYTICS = 0
CV_BRIGHTSPOT = 1


def create_detect_target(
Expand All @@ -27,8 +30,16 @@ def create_detect_target(
save_name: str,
) -> tuple[bool, base_detect_target.BaseDetectTarget | None]:
"""
Construct detect target class at runtime.
Factory function to create a detection target object.

Return:
Success, detect target object.
"""
# Fall back to CPU if no GPU is available
if device != "cpu" and not torch.cuda.is_available():
local_logger.warning("CUDA not available. Falling back to CPU.")
device = "cpu"

match detect_target_option:
case DetectTargetOption.ML_ULTRALYTICS:
return True, detect_target_ultralytics.DetectTargetUltralytics(
Expand All @@ -39,5 +50,11 @@ def create_detect_target(
show_annotations,
save_name,
)
case DetectTargetOption.CV_BRIGHTSPOT:
return True, detect_target_brightspot.DetectTargetBrightspot(
local_logger,
show_annotations,
save_name,
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restore empty line.

return False, None
17 changes: 15 additions & 2 deletions modules/detect_target/detect_target_ultralytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import time

import cv2
import torch
import ultralytics


from . import base_detect_target
from .. import image_and_time
from .. import detections_and_time
Expand Down Expand Up @@ -37,14 +39,18 @@ def __init__(
self.__device = device
self.__model = ultralytics.YOLO(model_path)
self.__counter = 0
self.__enable_half_precision = not self.__device == "cpu"
self.__enable_half_precision = self.__device != "cpu"
self.__local_logger = local_logger
self.__show_annotations = show_annotations
if override_full:
self.__enable_half_precision = False
self.__filename_prefix = ""
if save_name != "":
self.__filename_prefix = save_name + "_" + str(int(time.time())) + "_"

if self.__device != "cpu" and not torch.cuda.is_available():
self.__local_logger.warning("CUDA not available. Falling back to CPU.")
self.__device = "cpu"
Comment on lines +51 to +53
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add empty line above this line.


def run(
self, data: image_and_time.ImageAndTime
Expand Down Expand Up @@ -111,6 +117,13 @@ def run(
self.__counter += 1

if self.__show_annotations:
cv2.imshow("Annotated", image_annotated) # type: ignore
if image_annotated is None:
self.__local_logger.error("Annotated image is invalid.")
return False, detections


# Display the annotated image in a named window
cv2.imshow("Annotated", image_annotated)
cv2.waitKey(1) # Short delay to process GUI events

return True, detections
Loading
Loading