From 00d46c8ff44f16887b1d82c9defe7b0926f7a8a7 Mon Sep 17 00:00:00 2001 From: Siddh Patel <47033295+siddhp1@users.noreply.github.com> Date: Mon, 24 Mar 2025 16:14:31 -0400 Subject: [PATCH] Add config for enabling logging of worker timings --- config.yaml | 1 + main_2025.py | 10 ++++-- .../cluster_estimation_worker.py | 32 ++++++++++++------- .../communications/communications_worker.py | 25 ++++++++------- modules/data_merge/data_merge_worker.py | 25 ++++++++------- modules/detect_target/detect_target_worker.py | 29 ++++++++++------- .../flight_interface_worker.py | 25 ++++++++------- modules/geolocation/geolocation_worker.py | 25 ++++++++------- modules/video_input/video_input_worker.py | 25 ++++++++------- 9 files changed, 111 insertions(+), 86 deletions(-) diff --git a/config.yaml b/config.yaml index 9fb50b13..e57ab8c1 100644 --- a/config.yaml +++ b/config.yaml @@ -1,6 +1,7 @@ # Global constants for main with CUDA queue_max_size: 10 +log_timings: false # Enable logging of setup and iteration times of workers video_input: worker_period: 1.0 # seconds diff --git a/main_2025.py b/main_2025.py index 81383697..3dfae806 100644 --- a/main_2025.py +++ b/main_2025.py @@ -85,6 +85,7 @@ def main() -> int: # Local constants # pylint: disable=invalid-name QUEUE_MAX_SIZE = config["queue_max_size"] + LOG_TIMINGS = config["log_timings"] VIDEO_INPUT_WORKER_PERIOD = config["video_input"]["worker_period"] VIDEO_INPUT_OPTION = camera_factory.CameraOption(config["video_input"]["camera_enum"]) @@ -247,6 +248,7 @@ def main() -> int: VIDEO_INPUT_CAMERA_CONFIG, VIDEO_INPUT_IMAGE_NAME, VIDEO_INPUT_WORKER_PERIOD, + LOG_TIMINGS, ), input_queues=[], output_queues=[video_input_to_detect_target_queue], @@ -268,6 +270,7 @@ def main() -> int: DETECT_TARGET_SHOW_ANNOTATED, DETECT_TARGET_OPTION, DETECT_TARGET_CONFIG, + LOG_TIMINGS, ), input_queues=[video_input_to_detect_target_queue], output_queues=[detect_target_to_data_merge_queue], @@ -289,6 +292,7 @@ def main() -> int: FLIGHT_INTERFACE_TIMEOUT, FLIGHT_INTERFACE_BAUD_RATE, FLIGHT_INTERFACE_WORKER_PERIOD, + LOG_TIMINGS, ), input_queues=[ flight_interface_decision_queue, @@ -311,7 +315,7 @@ def main() -> int: result, data_merge_worker_properties = worker_manager.WorkerProperties.create( count=1, target=data_merge_worker.data_merge_worker, - work_arguments=(DATA_MERGE_TIMEOUT,), + work_arguments=(DATA_MERGE_TIMEOUT, LOG_TIMINGS), input_queues=[ detect_target_to_data_merge_queue, flight_interface_to_data_merge_queue, @@ -333,6 +337,7 @@ def main() -> int: work_arguments=( camera_intrinsics, camera_extrinsics, + LOG_TIMINGS, ), input_queues=[data_merge_to_geolocation_queue], output_queues=[geolocation_to_cluster_estimation_queue], @@ -354,6 +359,7 @@ def main() -> int: MIN_NEW_POINTS_TO_RUN, MAX_NUM_COMPONENTS, RANDOM_STATE, + LOG_TIMINGS, ), input_queues=[geolocation_to_cluster_estimation_queue], output_queues=[cluster_estimation_to_communications_queue], @@ -370,7 +376,7 @@ def main() -> int: result, communications_worker_properties = worker_manager.WorkerProperties.create( count=1, target=communications_worker.communications_worker, - work_arguments=(COMMUNICATIONS_TIMEOUT, COMMUNICATIONS_WORKER_PERIOD), + work_arguments=(COMMUNICATIONS_TIMEOUT, COMMUNICATIONS_WORKER_PERIOD, LOG_TIMINGS), input_queues=[ flight_interface_to_communications_queue, cluster_estimation_to_communications_queue, diff --git a/modules/cluster_estimation/cluster_estimation_worker.py b/modules/cluster_estimation/cluster_estimation_worker.py index 3b204316..cac54249 100644 --- a/modules/cluster_estimation/cluster_estimation_worker.py +++ b/modules/cluster_estimation/cluster_estimation_worker.py @@ -18,6 +18,7 @@ def cluster_estimation_worker( min_new_points_to_run: int, max_num_components: int, random_state: int, + log_timings: bool, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, @@ -39,6 +40,9 @@ def cluster_estimation_worker( random_state: int Seed for randomizer, to get consistent results. + log_timings: bool + Whether to log setup and iteration times. + input_queue: queue_proxy_wrapper.QueuePRoxyWrapper Data queue. @@ -48,7 +52,7 @@ def cluster_estimation_worker( worker_controller: worker_controller.WorkerController How the main process communicates to this worker process. """ - setup_start_time = time.time() + setup_start_time = time.time() if log_timings else None worker_name = pathlib.Path(__file__).stem process_id = os.getpid() @@ -75,14 +79,16 @@ def cluster_estimation_worker( # Get Pylance to stop complaining assert estimator is not None - setup_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." - ) + # Logging and controller is identical to detect_target_worker.py + # pylint: disable=duplicate-code + if log_timings: + setup_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." + ) while not controller.is_exit_requested(): - iteration_start_time = time.time() + iteration_start_time = time.time() if log_timings else None controller.check_pause() @@ -91,6 +97,8 @@ def cluster_estimation_worker( local_logger.info("Recieved type None, exiting.") break + # pylint: enable=duplicate-code + is_invalid = False for single_input in input_data: @@ -111,8 +119,8 @@ def cluster_estimation_worker( output_queue.queue.put(value) - iteration_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." - ) + if log_timings: + iteration_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." + ) diff --git a/modules/communications/communications_worker.py b/modules/communications/communications_worker.py index f3fe99c8..15805cf7 100644 --- a/modules/communications/communications_worker.py +++ b/modules/communications/communications_worker.py @@ -17,6 +17,7 @@ def communications_worker( timeout: float, period: float, + log_timings: bool, home_position_queue: queue_proxy_wrapper.QueueProxyWrapper, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, @@ -30,7 +31,7 @@ def communications_worker( input_queue and output_queue are data queues. controller is how the main process communicates to this worker process. """ - setup_start_time = time.time() + setup_start_time = time.time() if log_timings else None worker_name = pathlib.Path(__file__).stem process_id = os.getpid() @@ -61,14 +62,14 @@ def communications_worker( # Get Pylance to stop complaining assert comm is not None - setup_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." - ) + if log_timings: + setup_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." + ) while not controller.is_exit_requested(): - iteration_start_time = time.time() + iteration_start_time = time.time() if log_timings else None controller.check_pause() @@ -105,8 +106,8 @@ def communications_worker( output_queue.queue.put(message) message_output_queue.queue.put(message) - iteration_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." - ) + if log_timings: + iteration_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." + ) diff --git a/modules/data_merge/data_merge_worker.py b/modules/data_merge/data_merge_worker.py index 4ffd6073..2dfe724f 100644 --- a/modules/data_merge/data_merge_worker.py +++ b/modules/data_merge/data_merge_worker.py @@ -17,6 +17,7 @@ def data_merge_worker( timeout: float, + log_timings: bool, detections_input_queue: queue_proxy_wrapper.QueueProxyWrapper, odometry_input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, @@ -32,7 +33,7 @@ def data_merge_worker( Merge work is done in the worker process as the queues and control mechanisms are naturally available. """ - setup_start_time = time.time() + setup_start_time = time.time() if log_timings else None worker_name = pathlib.Path(__file__).stem process_id = os.getpid() @@ -55,14 +56,14 @@ def data_merge_worker( local_logger.error("Queue timed out on startup", True) return - setup_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." - ) + if log_timings: + setup_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." + ) while not controller.is_exit_requested(): - iteration_start_time = time.time() + iteration_start_time = time.time() if log_timings else None controller.check_pause() @@ -119,8 +120,8 @@ def data_merge_worker( output_queue.queue.put(merged) - iteration_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." - ) + if log_timings: + iteration_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." + ) diff --git a/modules/detect_target/detect_target_worker.py b/modules/detect_target/detect_target_worker.py index 81cf1215..0cec6944 100644 --- a/modules/detect_target/detect_target_worker.py +++ b/modules/detect_target/detect_target_worker.py @@ -23,6 +23,7 @@ def detect_target_worker( detect_target_brightspot.DetectTargetBrightspotConfig | detect_target_ultralytics.DetectTargetUltralyticsConfig ), + log_timings: bool, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, @@ -35,7 +36,7 @@ def detect_target_worker( input_queue and output_queue are data queues. controller is how the main process communicates to this worker process. """ - setup_start_time = time.time() + setup_start_time = time.time() if log_timings else None worker_name = pathlib.Path(__file__).stem process_id = os.getpid() @@ -63,14 +64,16 @@ def detect_target_worker( # Get Pylance to stop complaining assert detector is not None - setup_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." - ) + # Logging and controller is identical to cluster_estimation_worker.py + # pylint: disable=duplicate-code + if log_timings: + setup_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." + ) while not controller.is_exit_requested(): - iteration_start_time = time.time() + iteration_start_time = time.time() if log_timings else None controller.check_pause() @@ -79,6 +82,8 @@ def detect_target_worker( local_logger.info("Recieved type None, exiting.") break + # pylint: enable=duplicate-code + if not isinstance(input_data, image_and_time.ImageAndTime): local_logger.warning(f"Skipping unexpected input: {input_data}") continue @@ -89,8 +94,8 @@ def detect_target_worker( output_queue.queue.put(value) - iteration_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." - ) + if log_timings: + iteration_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." + ) diff --git a/modules/flight_interface/flight_interface_worker.py b/modules/flight_interface/flight_interface_worker.py index 579a91cf..1b420389 100644 --- a/modules/flight_interface/flight_interface_worker.py +++ b/modules/flight_interface/flight_interface_worker.py @@ -18,6 +18,7 @@ def flight_interface_worker( timeout: float, baud_rate: int, period: float, + log_timings: bool, input_queue: queue_proxy_wrapper.QueueProxyWrapper, coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, @@ -33,7 +34,7 @@ def flight_interface_worker( controller is how the main process communicates to this worker process. """ # TODO: Error handling - setup_start_time = time.time() + setup_start_time = time.time() if log_timings else None worker_name = pathlib.Path(__file__).stem process_id = os.getpid() @@ -60,14 +61,14 @@ def flight_interface_worker( home_position = interface.get_home_position() communications_output_queue.queue.put(home_position) - setup_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." - ) + if log_timings: + setup_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." + ) while not controller.is_exit_requested(): - iteration_start_time = time.time() + iteration_start_time = time.time() if log_timings else None controller.check_pause() @@ -90,8 +91,8 @@ def flight_interface_worker( # Pass the decision command to the flight controller interface.apply_decision(command) - iteration_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." - ) + if log_timings: + iteration_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." + ) diff --git a/modules/geolocation/geolocation_worker.py b/modules/geolocation/geolocation_worker.py index 6f4b2338..fd8ea525 100644 --- a/modules/geolocation/geolocation_worker.py +++ b/modules/geolocation/geolocation_worker.py @@ -17,6 +17,7 @@ def geolocation_worker( camera_intrinsics: camera_properties.CameraIntrinsics, camera_drone_extrinsics: camera_properties.CameraDroneExtrinsics, + log_timings: bool, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, @@ -28,7 +29,7 @@ def geolocation_worker( controller is how the main process communicates to this worker process. """ # TODO: Handle errors better - setup_start_time = time.time() + setup_start_time = time.time() if log_timings else None worker_name = pathlib.Path(__file__).stem process_id = os.getpid() @@ -53,14 +54,14 @@ def geolocation_worker( # Get Pylance to stop complaining assert locator is not None - setup_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." - ) + if log_timings: + setup_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." + ) while not controller.is_exit_requested(): - iteration_start_time = time.time() + iteration_start_time = time.time() if log_timings else None controller.check_pause() @@ -79,8 +80,8 @@ def geolocation_worker( output_queue.queue.put(value) - iteration_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." - ) + if log_timings: + iteration_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." + ) diff --git a/modules/video_input/video_input_worker.py b/modules/video_input/video_input_worker.py index df30c1ce..abcc364a 100644 --- a/modules/video_input/video_input_worker.py +++ b/modules/video_input/video_input_worker.py @@ -22,6 +22,7 @@ def video_input_worker( camera_config: camera_opencv.ConfigOpenCV | camera_picamera2.ConfigPiCamera2, maybe_image_name: str | None, period: float, + log_timings: bool, output_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: @@ -32,7 +33,7 @@ def video_input_worker( output_queue is the data queue. controller is how the main process communicates to this worker process. """ - setup_start_time = time.time() + setup_start_time = time.time() if log_timings else None worker_name = pathlib.Path(__file__).stem process_id = os.getpid() @@ -55,14 +56,14 @@ def video_input_worker( # Get Pylance to stop complaining assert input_device is not None - setup_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." - ) + if log_timings: + setup_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds." + ) while not controller.is_exit_requested(): - iteration_start_time = time.time() + iteration_start_time = time.time() if log_timings else None controller.check_pause() @@ -74,8 +75,8 @@ def video_input_worker( output_queue.queue.put(value) - iteration_end_time = time.time() - - local_logger.info( - f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." - ) + if log_timings: + iteration_end_time = time.time() + local_logger.info( + f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds." + )