From d3b081b5255aed9cd5f3d527c943c56f273a7500 Mon Sep 17 00:00:00 2001 From: Hector-c0de Date: Tue, 24 Jun 2025 17:16:16 -0400 Subject: [PATCH 1/2] Created tagging_mission_tester.py for clean ROS integration testing and tagging_mission_tester_2.py with comprehensive ROS mocking for environments without ROS. --- .../mission_planner/tagging_mission_tester.py | 349 ++++++++++++++++++ .../tagging_mission_tester_2.py | 329 +++++++++++++++++ 2 files changed, 678 insertions(+) create mode 100644 autonomy/src/mission_planner/tagging_mission_tester.py create mode 100755 autonomy/src/mission_planner/tagging_mission_tester_2.py diff --git a/autonomy/src/mission_planner/tagging_mission_tester.py b/autonomy/src/mission_planner/tagging_mission_tester.py new file mode 100644 index 0000000..daf26a9 --- /dev/null +++ b/autonomy/src/mission_planner/tagging_mission_tester.py @@ -0,0 +1,349 @@ +#!/usr/bin/env python3 +import math +import time +from typing import Optional + +import rospy +from geometry_msgs.msg import Point, PoseStamped + +from tagging_mission import TaggingMission +from mission_planner.types import MissionObject + + +class ManualTaggingController: + def __init__(self): + rospy.init_node("manual_tagging_tester") + + # Initialize mission first + self.mission = TaggingMission() + self.mission.controller_client = self.create_mock_controller_client() + self.mission.active = True + + # Mission parameters + self.params = { + "board_position": Point(x=8.0, y=0.0, z=-1.0), + "board_confidence": 1.0, + "target_animal": "reef_shark", + "animal_confidence": 1.0, + "submarine_speed": 0.8, + "far_distance": 3.0, # Distance for bonus points + } + + # Calculate mission timing + distance_to_board = math.sqrt( + self.params["board_position"].x ** 2 + + self.params["board_position"].y ** 2 + + self.params["board_position"].z ** 2 + ) + self.time_to_approach = distance_to_board / self.params["submarine_speed"] + + # Set the target animal from previous mission + self.mission.set_target_animal(self.params["target_animal"]) + + # Add phase tracking + self.phase_complete = { + "search": False, + "detect_config": False, + "detect_distance": False, + "approach": False, + "fire_first": False, + "fire_second": False, + } + + # Publishers + self.pose_pub = rospy.Publisher("/submarine_pose", PoseStamped, queue_size=10) + + # Timing and tracking + self.start_time = time.time() + self.torpedoes_fired = 0 + self.mission_complete = False + + # Mock torpedo service + self.setup_torpedo_service() + + def create_mock_controller_client(self): + """Create a properly mocked controller client""" + client = type("MockControllerClient", (), {})() + + def send_goal(*args, **kwargs): + return type("", (), {"status": "ACTIVE"})() + + def wait_for_result(timeout=None): + return True + + def get_result(): + result = type("", (), {})() + result.success = True + return result + + client.send_goal = send_goal + client.wait_for_result = wait_for_result + client.get_result = get_result + return client + + def setup_torpedo_service(self): + """Set up torpedo firing service mock""" + def mock_torpedo_service(torpedo_num): + self.torpedoes_fired += 1 + result = type("", (), {})() + + if torpedo_num == 1: + result.success = True + result.message = "Torpedo 1 fired - full hit through opening" + rospy.loginfo("Torpedo 1: Full hit!") + elif torpedo_num == 2: + result.success = True + result.message = "Torpedo 2 fired - partial hit on board" + rospy.loginfo("Torpedo 2: Partial hit!") + else: + result.success = False + result.message = "No more torpedoes available" + + return result + + self.mission.fire_torpedo_service = mock_torpedo_service + + def get_object(self, name): + """Simulate detected objects based on mission progress""" + t = time.time() - self.start_time + + # Board appears after initial search + if name == "board" and t > 1.0: + return MissionObject( + self.params["board_position"], "board", self.params["board_confidence"] + ) + + # Animals appear after board detection + if name == "reef_shark" and t > 2.0: + pos = Point() + pos.x = self.params["board_position"].x + pos.y = self.params["board_position"].y + 0.1 # Slightly right + pos.z = self.params["board_position"].z + 0.3 # Above center + return MissionObject(pos, "reef_shark", self.params["animal_confidence"]) + + if name == "sawfish" and t > 2.0: + pos = Point() + pos.x = self.params["board_position"].x + pos.y = self.params["board_position"].y - 0.1 # Slightly left + pos.z = self.params["board_position"].z - 0.3 # Below center + return MissionObject(pos, "sawfish", self.params["animal_confidence"]) + + # Openings appear after animals are identified + if name == "opening_1" and t > 3.0: + pos = Point() + pos.x = self.params["board_position"].x + pos.y = self.params["board_position"].y + 0.15 # Near reef shark + pos.z = self.params["board_position"].z + 0.25 + return MissionObject(pos, "opening_1", 1.0) + + if name == "opening_2" and t > 3.0: + pos = Point() + pos.x = self.params["board_position"].x + pos.y = self.params["board_position"].y - 0.15 # Near sawfish + pos.z = self.params["board_position"].z - 0.25 + return MissionObject(pos, "opening_2", 1.0) + + # Horizontal bars for far distance detection + if name == "horizontal_bar_1" and t > 1.5: + pos = Point() + pos.x = self.params["board_position"].x - self.params["far_distance"] + pos.y = -1.0 + pos.z = -1.0 + return MissionObject(pos, "horizontal_bar_1", 1.0) + + if name == "horizontal_bar_2" and t > 1.5: + pos = Point() + pos.x = self.params["board_position"].x - self.params["far_distance"] + pos.y = 1.0 + pos.z = -1.0 + return MissionObject(pos, "horizontal_bar_2", 1.0) + + return None + + def get_submarine_pose(self): + """Simulate submarine movement through the mission""" + t = time.time() - self.start_time + p = PoseStamped() + p.header.stamp = rospy.Time.now() + p.header.frame_id = "map" + + # Phase timing + search_time = 2.0 + config_time = 4.0 + distance_time = 6.0 + approach_time = self.time_to_approach + 8.0 + fire_first_time = approach_time + 2.0 + fire_second_time = fire_first_time + 2.0 + + # Update phases based on progress + if t < search_time: + self.mission.current_phase = "Search for Board" + self.phase_complete["search"] = False + # Start position + p.pose.position.x = 0.0 + p.pose.position.y = 0.0 + p.pose.position.z = -0.5 + elif t < config_time: + self.mission.current_phase = "Detect Board Configuration" + self.phase_complete["search"] = True + self.phase_complete["detect_config"] = False + # Slow approach for detection + progress = (t - search_time) / (config_time - search_time) + p.pose.position.x = self.params["board_position"].x * 0.3 * progress + p.pose.position.y = 0.0 + p.pose.position.z = -0.5 + elif t < distance_time: + self.mission.current_phase = "Detect Far Distance" + self.phase_complete["detect_config"] = True + self.phase_complete["detect_distance"] = False + # Continue slow approach + p.pose.position.x = self.params["board_position"].x * 0.4 + p.pose.position.y = 0.0 + p.pose.position.z = -0.5 + elif t < approach_time: + self.mission.current_phase = "Approach Board" + self.phase_complete["detect_distance"] = True + self.phase_complete["approach"] = False + # Move to far distance position for firing + progress = (t - distance_time) / (approach_time - distance_time) + target_x = self.params["board_position"].x - self.params["far_distance"] + start_x = self.params["board_position"].x * 0.4 + p.pose.position.x = start_x + (target_x - start_x) * progress + p.pose.position.y = 0.0 + p.pose.position.z = -0.5 + elif t < fire_first_time: + self.mission.current_phase = "Fire First Torpedo" + self.phase_complete["approach"] = True + self.phase_complete["fire_first"] = False + # Hold position at far distance + p.pose.position.x = self.params["board_position"].x - self.params["far_distance"] + p.pose.position.y = 0.0 + p.pose.position.z = -0.5 + elif t < fire_second_time: + self.mission.current_phase = "Fire Second Torpedo" + self.phase_complete["fire_first"] = True + self.phase_complete["fire_second"] = False + # Maintain firing position + p.pose.position.x = self.params["board_position"].x - self.params["far_distance"] + p.pose.position.y = 0.0 + p.pose.position.z = -0.5 + else: + self.mission.current_phase = "Mission Complete" + self.phase_complete["fire_second"] = True + self.mission_complete = True + # Move away and surface + p.pose.position.x = self.params["board_position"].x - self.params["far_distance"] + p.pose.position.y = 0.0 + p.pose.position.z = 0.0 # Surface + + # Add slight realistic movement + p.pose.position.x += 0.05 * math.sin(t * 0.5) + p.pose.position.y += 0.03 * math.cos(t * 0.7) + p.pose.position.z += 0.02 * math.sin(t * 0.3) + + return p + + def run(self): + """Run the tagging mission test""" + rate = rospy.Rate(10) + last_phase = None + + while not rospy.is_shutdown() and not self.mission_complete: + t = time.time() - self.start_time + + # Update mission state + pose = self.get_submarine_pose() + self.mission.submarine_pose = pose + self.pose_pub.publish(pose) + + # Update mission + self.mission.search_mission_object = self.get_object + self.update_mission_phases(t) # Add manual phase progression + self.mission.run() + + # Print phase transitions + if self.mission.current_phase != last_phase: + rospy.loginfo(f"\n=== Phase: {self.mission.current_phase} ===") + last_phase = self.mission.current_phase + + # Status updates every second + if rospy.get_time() % 1.0 < 0.1: + completion = sum(1 for p in self.phase_complete.values() if p) + distance_to_board = math.sqrt( + (pose.pose.position.x - self.params["board_position"].x) ** 2 + + (pose.pose.position.y - self.params["board_position"].y) ** 2 + + (pose.pose.position.z - self.params["board_position"].z) ** 2 + ) + + rospy.loginfo( + f"Time: {t:.1f}s | " + f"Phase: {self.mission.current_phase} ({completion}/6 complete) | " + f"Position: ({pose.pose.position.x:.2f}, {pose.pose.position.y:.2f}, {pose.pose.position.z:.2f}) | " + f"Distance to board: {distance_to_board:.2f}m | " + f"Target: {self.mission.target_animal} | " + f"Torpedoes: {self.torpedoes_fired}/2 | " + f"Score: {self.mission.score['total']}" + ) + + rate.sleep() + + # Final results + self.mission.calculate_final_score() + + rospy.loginfo("\n=== TAGGING MISSION COMPLETE ===") + rospy.loginfo(f"Result: {'SUCCESS' if self.mission.completed else 'INCOMPLETE'}") + rospy.loginfo(f"Target Animal: {self.mission.target_animal}") + rospy.loginfo(f"Board Configuration: {self.mission.board_configuration}") + rospy.loginfo(f"Torpedoes Fired: {self.torpedoes_fired}/2") + rospy.loginfo(f"Far Distance: {self.mission.far_distance:.2f}m") + rospy.loginfo(f"Duration: {time.time() - self.start_time:.1f}s") + rospy.loginfo(f"Final Score: {self.mission.score['total']}") + rospy.loginfo("\n--- SCORING BREAKDOWN ---") + for key, value in self.mission.score.items(): + if value > 0: + rospy.loginfo(f"{key.replace('_', ' ').title()}: {value}") + rospy.loginfo("="*50) + + rospy.signal_shutdown("Tagging mission complete") + + def update_mission_phases(self, t): + """Manually progress mission phases and trigger actions""" + # Board detection + if t > 2 and not self.phase_complete["search"]: + self.phase_complete["search"] = True + rospy.loginfo("Board detected!") + + # Configuration detection + if t > 4 and not self.phase_complete["detect_config"]: + self.mission.detect_board_configuration() + self.phase_complete["detect_config"] = True + rospy.loginfo(f"Board configuration: {self.mission.board_configuration}") + + # Distance detection + if t > 6 and not self.phase_complete["detect_distance"]: + self.mission.detect_far_distance() + self.phase_complete["detect_distance"] = True + rospy.loginfo(f"Far distance: {self.mission.far_distance}") + + # Openings detection + if t > 8 and self.phase_complete["detect_distance"]: + self.mission.detect_openings() + rospy.loginfo("Openings detected!") + + # Fire first torpedo + if t > 12 and not self.phase_complete["fire_first"]: + self.mission.fire_torpedo() + self.phase_complete["fire_first"] = True + + # Fire second torpedo + if t > 16 and not self.phase_complete["fire_second"] and self.phase_complete["fire_first"]: + self.mission.fire_torpedo() + self.phase_complete["fire_second"] = True + + +if __name__ == "__main__": + tester = ManualTaggingController() + try: + tester.run() + except rospy.ROSInterruptException: + pass \ No newline at end of file diff --git a/autonomy/src/mission_planner/tagging_mission_tester_2.py b/autonomy/src/mission_planner/tagging_mission_tester_2.py new file mode 100755 index 0000000..b4a1cc9 --- /dev/null +++ b/autonomy/src/mission_planner/tagging_mission_tester_2.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 +import sys +import types +from unittest import mock + +# Mock ROS modules +print("Setting up ROS mocks...") + +# Mock rospy +rospy_mock = mock.MagicMock() +rospy_mock.loginfo = lambda msg: print(f"[INFO] {msg}") +rospy_mock.logwarn = lambda msg: print(f"[WARN] {msg}") +rospy_mock.logerr = lambda msg: print(f"[ERROR] {msg}") +rospy_mock.is_shutdown = mock.Mock(return_value=False) +rospy_mock.signal_shutdown = mock.Mock() +rospy_mock.Publisher = mock.Mock() +rospy_mock.Rate = mock.Mock() +rospy_mock.init_node = mock.Mock() +rospy_mock.wait_for_service = mock.Mock() +rospy_mock.ServiceProxy = mock.Mock() +rospy_mock.ROSException = Exception +sys.modules['rospy'] = rospy_mock + +# Mock geometry_msgs +geometry_msgs_mock = mock.MagicMock() +sys.modules['geometry_msgs'] = geometry_msgs_mock +sys.modules['geometry_msgs.msg'] = geometry_msgs_mock.msg + +# Mock std_msgs +std_msgs_mock = mock.MagicMock() +sys.modules['std_msgs'] = std_msgs_mock +sys.modules['std_msgs.msg'] = std_msgs_mock.msg + +# Mock autonomy modules +autonomy_mock = mock.MagicMock() +sys.modules['autonomy'] = autonomy_mock +sys.modules['autonomy.srv'] = autonomy_mock.srv +sys.modules['autonomy.msg'] = autonomy_mock.msg + +# Real Point and PoseStamped classes +class Point: + def __init__(self, x=0, y=0, z=0): + self.x = x + self.y = y + self.z = z + +class PoseStamped: + def __init__(self): + self.header = type('', (), {'stamp': None, 'frame_id': 'map'})() + self.pose = type('', (), {'position': Point(), 'orientation': Point()})() + +# Set geometry_msgs classes +geometry_msgs_mock.msg.Point = Point +geometry_msgs_mock.msg.PoseStamped = PoseStamped + +# Mock mission planner +mission_planner_mock = mock.MagicMock() +sys.modules['mission_planner'] = mission_planner_mock + +# BaseMission class +class BaseMission: + def __init__(self, name="base"): + self.name = name + self.detected_objects = {} + self.submarine_pose = None + self.active = True + self.completed = False + self.controller_client = None + + def create_action(self, task, obj_cls): + return lambda: True + + def count_total_tasks(self): pass + def search_mission_object(self, name): return self.detected_objects.get(name) + def update_detected_objects(self): pass + def deactivate(self): self.active = False + def execute_identify_target(self, target_type): return True + +# Set mission planner modules +base_mission_module = types.ModuleType('mission_planner.base_mission') +base_mission_module.BaseMission = BaseMission +sys.modules['mission_planner.base_mission'] = base_mission_module + +# Mock types +from enum import Enum, auto +class TaskType(Enum): + SEARCH = auto() + APPROACH_TARGET = auto() + IDENTIFY_TARGET = auto() + FIRE_TORPEDO = auto() + SURFACE = auto() + +types_module = types.ModuleType('mission_planner.types') +types_module.TaskType = TaskType +sys.modules['mission_planner.types'] = types_module + +# Mock mission tree +class MissionTreeNode: + def __init__(self, task, name=""): + self.task = task + self.name = name + self.action = None + self.children = [] + self.completed = False + + def add_child(self, node): + self.children.append(node) + + def execute(self): + # Don't complete immediately + return False # Let mission handle completion + +tree_module = types.ModuleType('mission_planner.mission_tree') +tree_module.MissionTreeNode = MissionTreeNode +sys.modules['mission_planner.mission_tree'] = tree_module + +print("ROS mocks set up successfully!") + +import math +import time +from typing import Optional + +# Mission object +class MissionObject: + def __init__(self, position, name, confidence): + self.position = position + self.name = name + self.confidence = confidence + +# Import mission +from tagging_mission import TaggingMission + +class SimpleTaggingTester: + def __init__(self): + self.mission = TaggingMission() + self.mission.controller_client = self.create_mock_controller() + self.mission.active = True + self.mission.set_target_animal("reef_shark") + + self.board_pos = Point(x=8.0, y=0.0, z=-1.0) + self.start_time = time.time() + self.torpedoes_fired = 0 + self.phases_completed = { + "board_detected": False, + "config_detected": False, + "distance_detected": False, + "openings_detected": False, + "first_torpedo": False, + "second_torpedo": False + } + + self.mission.fire_torpedo_service = self.mock_torpedo_service + self.mission.completed = False + + def create_mock_controller(self): + """Simple controller mock""" + class Controller: + def send_goal(self, *args, **kwargs): pass + def wait_for_result(self, timeout=None): return True + def get_result(self): + result = type('', (), {})() + result.success = True + return result + return Controller() + + def mock_torpedo_service(self, torpedo_num): + """Mock torpedo firing""" + self.torpedoes_fired += 1 + result = type('', (), {})() + + if torpedo_num == 1: + result.success = True + result.message = "Torpedo 1 fired - full hit through opening" + self.phases_completed["first_torpedo"] = True + print(f"[TORPEDO] {result.message}") + elif torpedo_num == 2: + result.success = True + result.message = "Torpedo 2 fired - partial hit on board" + self.phases_completed["second_torpedo"] = True + print(f"[TORPEDO] {result.message}") + else: + result.success = False + result.message = "No more torpedoes available" + + return result + + def get_submarine_pose(self): + """Simulate submarine position""" + t = time.time() - self.start_time + pose = PoseStamped() + + # Move towards board + progress = min(t / 10.0, 1.0) # 10 seconds to board + pose.pose.position.x = self.board_pos.x * progress * 0.7 # Stop before board + pose.pose.position.y = 0.1 * math.sin(t) # Small movement + pose.pose.position.z = -0.5 + + return pose + + def get_detected_object(self, name): + """Simulate object detection""" + t = time.time() - self.start_time + + # Progressive detection + if name == "board" and t > 1.0: + return MissionObject(self.board_pos, "board", 1.0) + elif name == "reef_shark" and t > 2.0: + pos = Point(self.board_pos.x, self.board_pos.y + 0.1, self.board_pos.z + 0.3) + return MissionObject(pos, "reef_shark", 1.0) + elif name == "sawfish" and t > 2.0: + pos = Point(self.board_pos.x, self.board_pos.y - 0.1, self.board_pos.z - 0.3) + return MissionObject(pos, "sawfish", 1.0) + elif name == "opening_1" and t > 3.0: + pos = Point(self.board_pos.x, self.board_pos.y + 0.15, self.board_pos.z + 0.25) + return MissionObject(pos, "opening_1", 1.0) + elif name == "opening_2" and t > 3.0: + pos = Point(self.board_pos.x, self.board_pos.y - 0.15, self.board_pos.z - 0.25) + return MissionObject(pos, "opening_2", 1.0) + elif name == "horizontal_bar_1" and t > 1.5: + pos = Point(self.board_pos.x - 3.0, -1.0, -1.0) + return MissionObject(pos, "horizontal_bar_1", 1.0) + + return None + + def run(self): + """Run the test""" + print("Starting Tagging Mission Test") + print("=" * 50) + + last_phase = None + + for i in range(600): # 60 seconds at 10Hz + t = time.time() - self.start_time + + # Update mission + self.mission.submarine_pose = self.get_submarine_pose() + self.mission.search_mission_object = self.get_detected_object + + # Manual phase progression and torpedo firing + self.update_mission_phases(t) + + # Run mission (but don't let it complete via tree) + original_completed = self.mission.completed + self.mission.run() + self.mission.completed = original_completed # Restore our control + + # Log phase changes + if self.mission.current_phase != last_phase: + print(f"\nPhase: {self.mission.current_phase}") + last_phase = self.mission.current_phase + + # Status every 3 seconds + if i % 30 == 0: + pos = self.mission.submarine_pose.pose.position + print(f"Time: {t:.1f}s | Pos: ({pos.x:.1f}, {pos.y:.1f}, {pos.z:.1f}) | " + f"Target: {self.mission.target_animal} | " + f"Torpedoes: {self.torpedoes_fired}/2 | " + f"Score: {self.mission.score['total']}") + + time.sleep(0.1) # 10Hz + + # Check if mission should complete + if self.phases_completed["second_torpedo"] and t > 15: + self.mission.completed = True + break + + if t > 60: # Timeout + break + + # Final results + self.mission.calculate_final_score() + + print("\n" + "=" * 50) + print("MISSION COMPLETE") + print("=" * 50) + print(f"Success: {self.mission.completed}") + print(f"Target: {self.mission.target_animal}") + print(f"Torpedoes: {self.torpedoes_fired}/2") + print(f"Duration: {time.time() - self.start_time:.1f}s") + print(f"Final Score: {self.mission.score['total']}") + print("\nScore Breakdown:") + for key, value in self.mission.score.items(): + if value > 0: + print(f" {key.replace('_', ' ').title()}: {value}") + print("=" * 50) + + def update_mission_phases(self, t): + """Manually progress mission phases and trigger actions""" + # Board detection + if t > 2 and not self.phases_completed["board_detected"]: + self.phases_completed["board_detected"] = True + print("Board detected!") + + # Configuration detection + if t > 4 and not self.phases_completed["config_detected"]: + self.mission.detect_board_configuration() + self.phases_completed["config_detected"] = True + print(f"Board configuration: {self.mission.board_configuration}") + + # Distance detection + if t > 6 and not self.phases_completed["distance_detected"]: + self.mission.detect_far_distance() + self.phases_completed["distance_detected"] = True + print(f"Far distance: {self.mission.far_distance}") + + # Openings detection + if t > 8 and not self.phases_completed["openings_detected"]: + self.mission.detect_openings() + self.phases_completed["openings_detected"] = True + print("Openings detected!") + + # Fire first torpedo + if t > 12 and not self.phases_completed["first_torpedo"]: + self.mission.fire_torpedo() + + # Fire second torpedo + if t > 16 and not self.phases_completed["second_torpedo"] and self.phases_completed["first_torpedo"]: + self.mission.fire_torpedo() + +if __name__ == "__main__": + tester = SimpleTaggingTester() + try: + tester.run() + except KeyboardInterrupt: + print("\nTest interrupted by user") + except Exception as e: + print(f"\nTest failed with error: {e}") + import traceback + traceback.print_exc() \ No newline at end of file From c9b5ba33a5306edf443cc1f27edc623279bff1fe Mon Sep 17 00:00:00 2001 From: Hector-c0de Date: Tue, 24 Jun 2025 21:32:33 -0400 Subject: [PATCH 2/2] Small changes in syntax --- autonomy/src/mission_planner/tagging_mission_tester_2.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/autonomy/src/mission_planner/tagging_mission_tester_2.py b/autonomy/src/mission_planner/tagging_mission_tester_2.py index b4a1cc9..87d0e82 100755 --- a/autonomy/src/mission_planner/tagging_mission_tester_2.py +++ b/autonomy/src/mission_planner/tagging_mission_tester_2.py @@ -130,7 +130,7 @@ def __init__(self, position, name, confidence): # Import mission from tagging_mission import TaggingMission -class SimpleTaggingTester: +class TaggingTester: def __init__(self): self.mission = TaggingMission() self.mission.controller_client = self.create_mock_controller() @@ -153,7 +153,7 @@ def __init__(self): self.mission.completed = False def create_mock_controller(self): - """Simple controller mock""" + """Controller mock""" class Controller: def send_goal(self, *args, **kwargs): pass def wait_for_result(self, timeout=None): return True @@ -318,7 +318,7 @@ def update_mission_phases(self, t): self.mission.fire_torpedo() if __name__ == "__main__": - tester = SimpleTaggingTester() + tester = TaggingTester() try: tester.run() except KeyboardInterrupt: