Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Global constants for main with CUDA

queue_max_size: 10
log_timings: false # Enable logging of setup and iteration times of workers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nitpick: add 2 spaces between end of line and the comment


video_input:
worker_period: 1.0 # seconds
Expand Down
10 changes: 8 additions & 2 deletions main_2025.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def main() -> int:
# Local constants
# pylint: disable=invalid-name
QUEUE_MAX_SIZE = config["queue_max_size"]
LOG_TIMINGS = config["log_timings"]

VIDEO_INPUT_WORKER_PERIOD = config["video_input"]["worker_period"]
VIDEO_INPUT_OPTION = camera_factory.CameraOption(config["video_input"]["camera_enum"])
Expand Down Expand Up @@ -248,6 +249,7 @@ def main() -> int:
VIDEO_INPUT_CAMERA_CONFIG,
VIDEO_INPUT_IMAGE_NAME,
VIDEO_INPUT_WORKER_PERIOD,
LOG_TIMINGS,
),
input_queues=[],
output_queues=[video_input_to_detect_target_queue],
Expand All @@ -269,6 +271,7 @@ def main() -> int:
DETECT_TARGET_SHOW_ANNOTATED,
DETECT_TARGET_OPTION,
DETECT_TARGET_CONFIG,
LOG_TIMINGS,
),
input_queues=[video_input_to_detect_target_queue],
output_queues=[detect_target_to_data_merge_queue],
Expand All @@ -290,6 +293,7 @@ def main() -> int:
FLIGHT_INTERFACE_TIMEOUT,
FLIGHT_INTERFACE_BAUD_RATE,
FLIGHT_INTERFACE_WORKER_PERIOD,
LOG_TIMINGS,
),
input_queues=[
flight_interface_decision_queue,
Expand All @@ -312,7 +316,7 @@ def main() -> int:
result, data_merge_worker_properties = worker_manager.WorkerProperties.create(
count=1,
target=data_merge_worker.data_merge_worker,
work_arguments=(DATA_MERGE_TIMEOUT,),
work_arguments=(DATA_MERGE_TIMEOUT, LOG_TIMINGS),
input_queues=[
detect_target_to_data_merge_queue,
flight_interface_to_data_merge_queue,
Expand All @@ -334,6 +338,7 @@ def main() -> int:
work_arguments=(
camera_intrinsics,
camera_extrinsics,
LOG_TIMINGS,
),
input_queues=[data_merge_to_geolocation_queue],
output_queues=[geolocation_to_cluster_estimation_queue],
Expand All @@ -355,6 +360,7 @@ def main() -> int:
MIN_NEW_POINTS_TO_RUN,
MAX_NUM_COMPONENTS,
RANDOM_STATE,
LOG_TIMINGS,
MIN_POINTS_PER_CLUSTER,
),
input_queues=[geolocation_to_cluster_estimation_queue],
Expand All @@ -372,7 +378,7 @@ def main() -> int:
result, communications_worker_properties = worker_manager.WorkerProperties.create(
count=1,
target=communications_worker.communications_worker,
work_arguments=(COMMUNICATIONS_TIMEOUT, COMMUNICATIONS_WORKER_PERIOD),
work_arguments=(COMMUNICATIONS_TIMEOUT, COMMUNICATIONS_WORKER_PERIOD, LOG_TIMINGS),
input_queues=[
flight_interface_to_communications_queue,
cluster_estimation_to_communications_queue,
Expand Down
32 changes: 20 additions & 12 deletions modules/cluster_estimation/cluster_estimation_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def cluster_estimation_worker(
min_new_points_to_run: int,
max_num_components: int,
random_state: int,
log_timings: bool,
min_points_per_cluster: int,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
Expand All @@ -40,6 +41,9 @@ def cluster_estimation_worker(
random_state: int
Seed for randomizer, to get consistent results.

log_timings: bool
Whether to log setup and iteration times.

input_queue: queue_proxy_wrapper.QueuePRoxyWrapper
Data queue.

Expand All @@ -49,7 +53,7 @@ def cluster_estimation_worker(
worker_controller: worker_controller.WorkerController
How the main process communicates to this worker process.
"""
setup_start_time = time.time()
setup_start_time = time.time() if log_timings else None

worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
Expand Down Expand Up @@ -77,14 +81,16 @@ def cluster_estimation_worker(
# Get Pylance to stop complaining
assert estimator is not None

setup_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)
# Logging and controller is identical to detect_target_worker.py
# pylint: disable=duplicate-code
if log_timings:
setup_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)

while not controller.is_exit_requested():
iteration_start_time = time.time()
iteration_start_time = time.time() if log_timings else None

controller.check_pause()

Expand All @@ -93,6 +99,8 @@ def cluster_estimation_worker(
local_logger.info("Recieved type None, exiting.")
break

# pylint: enable=duplicate-code

is_invalid = False

for single_input in input_data:
Expand All @@ -113,8 +121,8 @@ def cluster_estimation_worker(

output_queue.queue.put(value)

iteration_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
if log_timings:
iteration_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
25 changes: 13 additions & 12 deletions modules/communications/communications_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
def communications_worker(
timeout: float,
period: float,
log_timings: bool,
home_position_queue: queue_proxy_wrapper.QueueProxyWrapper,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
Expand All @@ -30,7 +31,7 @@ def communications_worker(
input_queue and output_queue are data queues.
controller is how the main process communicates to this worker process.
"""
setup_start_time = time.time()
setup_start_time = time.time() if log_timings else None

worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
Expand Down Expand Up @@ -61,14 +62,14 @@ def communications_worker(
# Get Pylance to stop complaining
assert comm is not None

setup_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)
if log_timings:
setup_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)

while not controller.is_exit_requested():
iteration_start_time = time.time()
iteration_start_time = time.time() if log_timings else None

controller.check_pause()

Expand Down Expand Up @@ -105,8 +106,8 @@ def communications_worker(
output_queue.queue.put(message)
message_output_queue.queue.put(message)

iteration_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
if log_timings:
iteration_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
25 changes: 13 additions & 12 deletions modules/data_merge/data_merge_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

def data_merge_worker(
timeout: float,
log_timings: bool,
detections_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
odometry_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
Expand All @@ -32,7 +33,7 @@ def data_merge_worker(
Merge work is done in the worker process as the queues and control mechanisms
are naturally available.
"""
setup_start_time = time.time()
setup_start_time = time.time() if log_timings else None

worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
Expand All @@ -55,14 +56,14 @@ def data_merge_worker(
local_logger.error("Queue timed out on startup", True)
return

setup_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)
if log_timings:
setup_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)

while not controller.is_exit_requested():
iteration_start_time = time.time()
iteration_start_time = time.time() if log_timings else None

controller.check_pause()

Expand Down Expand Up @@ -119,8 +120,8 @@ def data_merge_worker(

output_queue.queue.put(merged)

iteration_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
if log_timings:
iteration_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
29 changes: 17 additions & 12 deletions modules/detect_target/detect_target_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def detect_target_worker(
detect_target_brightspot.DetectTargetBrightspotConfig
| detect_target_ultralytics.DetectTargetUltralyticsConfig
),
log_timings: bool,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
Expand All @@ -35,7 +36,7 @@ def detect_target_worker(
input_queue and output_queue are data queues.
controller is how the main process communicates to this worker process.
"""
setup_start_time = time.time()
setup_start_time = time.time() if log_timings else None

worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
Expand Down Expand Up @@ -63,14 +64,16 @@ def detect_target_worker(
# Get Pylance to stop complaining
assert detector is not None

setup_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)
# Logging and controller is identical to cluster_estimation_worker.py
# pylint: disable=duplicate-code
if log_timings:
setup_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)

while not controller.is_exit_requested():
iteration_start_time = time.time()
iteration_start_time = time.time() if log_timings else None

controller.check_pause()

Expand All @@ -79,6 +82,8 @@ def detect_target_worker(
local_logger.info("Recieved type None, exiting.")
break

# pylint: enable=duplicate-code

if not isinstance(input_data, image_and_time.ImageAndTime):
local_logger.warning(f"Skipping unexpected input: {input_data}")
continue
Expand All @@ -89,8 +94,8 @@ def detect_target_worker(

output_queue.queue.put(value)

iteration_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
if log_timings:
iteration_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
25 changes: 13 additions & 12 deletions modules/flight_interface/flight_interface_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def flight_interface_worker(
timeout: float,
baud_rate: int,
period: float,
log_timings: bool,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
Expand All @@ -33,7 +34,7 @@ def flight_interface_worker(
controller is how the main process communicates to this worker process.
"""
# TODO: Error handling
setup_start_time = time.time()
setup_start_time = time.time() if log_timings else None

worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
Expand All @@ -60,14 +61,14 @@ def flight_interface_worker(
home_position = interface.get_home_position()
communications_output_queue.queue.put(home_position)

setup_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)
if log_timings:
setup_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)

while not controller.is_exit_requested():
iteration_start_time = time.time()
iteration_start_time = time.time() if log_timings else None

controller.check_pause()

Expand All @@ -90,8 +91,8 @@ def flight_interface_worker(
# Pass the decision command to the flight controller
interface.apply_decision(command)

iteration_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
if log_timings:
iteration_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
Loading
Loading