From 9d07c1985558cf38656876f81768ea0fad40b114 Mon Sep 17 00:00:00 2001 From: AlejandroRoberts Date: Sat, 31 May 2025 12:36:48 -0400 Subject: [PATCH 1/2] prequalification tester first implementation --- .../mission/prequalification_tester.py | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 autonomy/scripts/mission/prequalification_tester.py diff --git a/autonomy/scripts/mission/prequalification_tester.py b/autonomy/scripts/mission/prequalification_tester.py new file mode 100644 index 0000000..a085281 --- /dev/null +++ b/autonomy/scripts/mission/prequalification_tester.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 + +import rospy +from std_msgs.msg import Int16 +from sensor_msgs.msg import Imu +from actionlib import SimpleActionClient +from autonomy.msg import NavigateToWaypointAction + +class PrequalificationTester: + def __init__(self): + rospy.init_node('prequalification_tester', anonymous=True) + + # Flags and storage + self.depth_received = False + self.imu_received = False + self.thruster_values = {} + + # Subscribers + rospy.Subscriber('/hydrus/depth', Int16, self.depth_callback) + rospy.Subscriber('/imu/data', Imu, self.imu_callback) + for i in range(1, 9): + rospy.Subscriber(f'/hydrus/thrusters/{i}', Int16, + lambda msg, idx=i: self.thruster_callback(msg, idx)) + + # Publishers + self.depth_pub = rospy.Publisher('/hydrus/depth', Int16, queue_size=10) + self.imu_pub = rospy.Publisher('/imu/data', Imu, queue_size=10) + self.thruster_pubs = { + i: rospy.Publisher(f'/hydrus/thrusters/{i}', Int16, queue_size=10) + for i in range(1, 9) + } + + # Action client + self.controller_client = SimpleActionClient('controller_action', NavigateToWaypointAction) + + def publish_mock_data(self): + # Publish mock depth + self.depth_pub.publish(Int16(data=25)) + rospy.loginfo("Mock depth published.") + + # Publish mock IMU + imu_msg = Imu() + imu_msg.header.stamp = rospy.Time.now() + imu_msg.orientation_covariance[0] = -1 # uncalibrated for test + self.imu_pub.publish(imu_msg) + rospy.loginfo("Mock IMU published.") + + # Publish mock thruster values + for i in range(1, 9): + self.thruster_pubs[i].publish(Int16(data=1500)) + rospy.loginfo("Mock thruster values published.") + + def depth_callback(self, msg): + self.depth_received = True + self.last_depth = msg.data + + def imu_callback(self, msg): + self.imu_received = True + + def thruster_callback(self, msg, idx): + self.thruster_values[idx] = msg.data + + def check_all(self): + rospy.loginfo("Checking controller action server...") + self.controller_client.wait_for_server(timeout=rospy.Duration(5)) + controller_up = self.controller_client.wait_for_server(rospy.Duration(1.0)) + + rospy.sleep(2) + + print("\n==== PREQUALIFICATION TEST RESULTS ====") + print(f"[{'✓' if self.depth_received else '✗'}] Depth sensor data received") + print(f"[{'✓' if self.imu_received else '✗'}] IMU data received") + print(f"[{'✓' if controller_up else '✗'}] Controller action server online") + + thruster_ok = all(abs(val - 1500) < 10 for val in self.thruster_values.values()) and len(self.thruster_values) == 8 + print(f"[{'✓' if thruster_ok else '✗'}] All thrusters neutral") + + if not thruster_ok: + for i in range(1, 9): + print(f"Thruster {i}: {self.thruster_values.get(i, 'N/A')}") + + print("========================================") + + def run(self): + rospy.sleep(1) + self.publish_mock_data() + rospy.sleep(3) + self.check_all() + + +if __name__ == '__main__': + try: + tester = PrequalificationTester() + tester.run() + except rospy.ROSInterruptException: + rospy.loginfo("Prequalification tester interrupted.") + +#How to Run code (Needs to be verified): +#roscore +#rosrun your_package prequalification_tester.py From 3a72fab84915fe17ba6e84cf0c0d39d151ec731b Mon Sep 17 00:00:00 2001 From: AlejandroRoberts Date: Sat, 7 Jun 2025 14:32:42 -0400 Subject: [PATCH 2/2] prequalification code tested --- .../mission/prequalification_tester.py | 117 +++++++++--------- 1 file changed, 57 insertions(+), 60 deletions(-) diff --git a/autonomy/scripts/mission/prequalification_tester.py b/autonomy/scripts/mission/prequalification_tester.py index a085281..3c5e70b 100644 --- a/autonomy/scripts/mission/prequalification_tester.py +++ b/autonomy/scripts/mission/prequalification_tester.py @@ -1,55 +1,37 @@ #!/usr/bin/env python3 import rospy +import random from std_msgs.msg import Int16 -from sensor_msgs.msg import Imu -from actionlib import SimpleActionClient -from autonomy.msg import NavigateToWaypointAction +from sensor_msgs.msg import Imu, Image +from geometry_msgs.msg import Point + +# Mock GateDetection message class +class GateDetection: + def __init__(self): + self.position = Point() + self.confidence = 0.0 class PrequalificationTester: def __init__(self): rospy.init_node('prequalification_tester', anonymous=True) - # Flags and storage self.depth_received = False self.imu_received = False + self.gate_detected = False + self.camera_connected = False + self.last_gate_position = None + self.gate_confidence = 0.0 self.thruster_values = {} - # Subscribers rospy.Subscriber('/hydrus/depth', Int16, self.depth_callback) rospy.Subscriber('/imu/data', Imu, self.imu_callback) + rospy.Subscriber('/zed2i/zed_node/image_raw', Image, self.camera_callback) + for i in range(1, 9): rospy.Subscriber(f'/hydrus/thrusters/{i}', Int16, lambda msg, idx=i: self.thruster_callback(msg, idx)) - # Publishers - self.depth_pub = rospy.Publisher('/hydrus/depth', Int16, queue_size=10) - self.imu_pub = rospy.Publisher('/imu/data', Imu, queue_size=10) - self.thruster_pubs = { - i: rospy.Publisher(f'/hydrus/thrusters/{i}', Int16, queue_size=10) - for i in range(1, 9) - } - - # Action client - self.controller_client = SimpleActionClient('controller_action', NavigateToWaypointAction) - - def publish_mock_data(self): - # Publish mock depth - self.depth_pub.publish(Int16(data=25)) - rospy.loginfo("Mock depth published.") - - # Publish mock IMU - imu_msg = Imu() - imu_msg.header.stamp = rospy.Time.now() - imu_msg.orientation_covariance[0] = -1 # uncalibrated for test - self.imu_pub.publish(imu_msg) - rospy.loginfo("Mock IMU published.") - - # Publish mock thruster values - for i in range(1, 9): - self.thruster_pubs[i].publish(Int16(data=1500)) - rospy.loginfo("Mock thruster values published.") - def depth_callback(self, msg): self.depth_received = True self.last_depth = msg.data @@ -57,44 +39,59 @@ def depth_callback(self, msg): def imu_callback(self, msg): self.imu_received = True + def camera_callback(self, msg): + self.camera_connected = True + def thruster_callback(self, msg, idx): self.thruster_values[idx] = msg.data - def check_all(self): - rospy.loginfo("Checking controller action server...") - self.controller_client.wait_for_server(timeout=rospy.Duration(5)) - controller_up = self.controller_client.wait_for_server(rospy.Duration(1.0)) - - rospy.sleep(2) + def gate_callback(self, msg): + if self.camera_connected: + self.gate_detected = True + self.last_gate_position = msg.position + self.gate_confidence = msg.confidence + else: + rospy.logwarn("Gate detection ignored — camera not connected.") + def check_all(self): print("\n==== PREQUALIFICATION TEST RESULTS ====") print(f"[{'✓' if self.depth_received else '✗'}] Depth sensor data received") print(f"[{'✓' if self.imu_received else '✗'}] IMU data received") - print(f"[{'✓' if controller_up else '✗'}] Controller action server online") - - thruster_ok = all(abs(val - 1500) < 10 for val in self.thruster_values.values()) and len(self.thruster_values) == 8 - print(f"[{'✓' if thruster_ok else '✗'}] All thrusters neutral") - if not thruster_ok: - for i in range(1, 9): - print(f"Thruster {i}: {self.thruster_values.get(i, 'N/A')}") + all_thrusters_ok = all(v == 1500 for v in self.thruster_values.values()) + print(f"[{'✓' if all_thrusters_ok else '✗'}] All thrusters neutral") + print(f"[{'✓' if self.camera_connected else '✗'}] Camera is connected and publishing") + print(f"[{'✓' if self.gate_detected else '✗'}] Gate detected by camera") + if self.gate_detected: + p = self.last_gate_position + print(f" ↳ Position: ({p.x:.2f}, {p.y:.2f}, {p.z:.2f})") + print(f" ↳ Confidence: {self.gate_confidence:.2f}") print("========================================") - def run(self): - rospy.sleep(1) - self.publish_mock_data() - rospy.sleep(3) - self.check_all() - - -if __name__ == '__main__': +class GatePublisher: + def create_gate(self): + msg = GateDetection() + msg.position = Point( + x=round(random.uniform(4.0, 5.0), 2), + y=round(random.uniform(2.0, 3.0), 2), + z=round(random.uniform(0.5, 1.5), 2) + ) + msg.confidence = round(random.uniform(0.8, 0.99), 2) + rospy.loginfo(f"[MOCK] Generated gate: Pos=({msg.position.x:.2f}, {msg.position.y:.2f}, {msg.position.z:.2f}), Conf={msg.confidence:.2f}") + return msg + +if __name__ == "__main__": try: tester = PrequalificationTester() - tester.run() - except rospy.ROSInterruptException: - rospy.loginfo("Prequalification tester interrupted.") + gate_pub = GatePublisher() + rate = rospy.Rate(1) + + while not rospy.is_shutdown(): + gate_msg = gate_pub.create_gate() + tester.gate_callback(gate_msg) + tester.check_all() + rate.sleep() -#How to Run code (Needs to be verified): -#roscore -#rosrun your_package prequalification_tester.py + except rospy.ROSInterruptException: + pass