Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions rsl_comm_py/um7_broadcast_packets.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,30 @@ def __repr__(self):
@dataclass
class UM7HealthPacket:
health: int
sats_used: int
hdop: int
sats_in_view: int
ovf: bool
mg_n: bool
acc_n: bool
accel: bool
gyro: bool
mag: bool
gps: bool

def __repr__(self):
return f"HealthPacket("\
f"raw_value=0x{self.health:04X} -> " \
f"SATS_USED={(self.health >> 26) & 0x3F}, " \
f"HDOP={(self.health >> 16) & 0x7F}, " \
f"SATS_IN_VIEW={(self.health >> 10) & 0x3F}, " \
f"OVF={bool((self.health >> 8) & 0x01)}, " \
f"MG_N={bool((self.health >> 5) & 0x01)}, " \
f"ACC_N={bool((self.health >> 4) & 0x01)}, " \
f"ACCEL={bool((self.health >> 3) & 0x01)}, "\
f"GYRO={bool((self.health >> 2) & 0x01)}, " \
f"MAG={bool((self.health >> 1) & 0x01)}, " \
f"GPS={bool((self.health >> 0) & 0x01)})"
f"SATS_USED={self.sats_used}, " \
f"HDOP={self.hdop}, " \
f"SATS_IN_VIEW={self.sats_in_view}, " \
f"OVF={self.ovf}, " \
f"MG_N={self.mg_n}, " \
f"ACC_N={self.acc_n}, " \
f"ACCEL={self.accel}, "\
f"GYRO={self.gyro}, " \
f"MAG={self.mag}, " \
f"GPS={self.gps})"


@dataclass
Expand Down Expand Up @@ -202,6 +212,24 @@ def __repr__(self):
f"gyro_bias=[{self.gyro_bias_x:>+8.3f}, {self.gyro_bias_y:>+8.3f}, {self.gyro_bias_z:>+8.3f}])"


@dataclass
class UM7GPSPacket:
gps_latitude: float
gps_longitude: float
gps_altitude: float
gps_course: float
gps_speed: float
gps_time: float

def __repr__(self):
return f"GPSPacket("\
f"latitude={self.gps_latitude}, " \
f"longitude={self.gps_longitude}, " \
f"course={self.gps_course}, " \
f"speed={self.gps_speed}, " \
f"time_stamp={self.gps_time})"


if __name__ == '__main__':
pass

41 changes: 38 additions & 3 deletions rsl_comm_py/um7_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from rsl_comm_py.um7_broadcast_packets import UM7AllRawPacket, UM7HealthPacket, UM7GyroBiasPacket, UM7ProcMagPacket, \
UM7ProcGyroPacket, UM7ProcAccelPacket, UM7RawMagPacket, UM7RawGyroPacket, UM7RawAccelPacket, UM7QuaternionPacket, \
UM7EulerPacket, UM7AllProcPacket
UM7EulerPacket, UM7AllProcPacket, UM7GPSPacket, UM7VelocityPacket
from rsl_comm_py.um7_registers import UM7Registers


Expand Down Expand Up @@ -389,6 +389,8 @@ def recv_broadcast(self, num_packets: int = -1, flush_buffer_on_start: bool = F
mag_raw_start_addr = self.svd_parser.find_register_by(name='DREG_MAG_RAW_XY').address
gyro_bias_start_addr = self.svd_parser.find_register_by(name='DREG_GYRO_BIAS_X').address
quat_addr = self.svd_parser.find_register_by(name='DREG_QUAT_AB').address
gps_addr = self.svd_parser.find_register_by(name='DREG_GPS_LATITUDE').address
velocity_addr = self.svd_parser.find_register_by(name='DREG_VELOCITY_NORTH').address

if packet_addr == health_start_addr:
if len(packet) == 11:
Expand All @@ -398,7 +400,7 @@ def recv_broadcast(self, num_packets: int = -1, flush_buffer_on_start: bool = F
else:
logging.error(f"[HEALTH]: invalid packet length, got {len(packet)}, packet: {packet}!")
elif packet_addr == euler_start_addr:
if len(packet) == 27:
if len(packet) == 43:
logging.info("[EULER]: broadcast packet found!")
yield self.decode_euler_broadcast(packet)
received_packets += 1
Expand Down Expand Up @@ -468,6 +470,20 @@ def recv_broadcast(self, num_packets: int = -1, flush_buffer_on_start: bool = F
received_packets += 1
else:
logging.error(f"[GYRO_1_BIAS]: invalid packet length, got {len(packet)}, packet: {packet}!")
elif packet_addr == gps_addr:
if len(packet) == 31:
logging.info("[GPS]: broadcast packet found!")
yield self.decode_gps_broadcast(packet)
received_packets += 1
else:
logging.error(f"[GPS]: invalid packet length, got {len(packet)}, packet: {packet}!")
elif packet_addr == velocity_addr:
if len(packet) == 23:
logging.info("[VELOCITY]: broadcast packet found!")
yield self.decode_velocity_broadcast(packet)
received_packets += 1
else:
logging.error(f"[VELOCITY]: invalid packet length, got {len(packet)}, packet: {packet}!")
else:
logging.error(f"[BROADCAST ERROR]: packet with addr {packet_addr}, reg: {start_reg.name} found "
f"of length: {len(packet)} bytes, "
Expand Down Expand Up @@ -545,10 +561,29 @@ def decode_gyro_bias_broadcast(self, packet) -> UM7GyroBiasPacket:
gyro_bias_x, gyro_bias_y, gyro_bias_z, = struct.unpack('>fff', payload[0:12])
return UM7GyroBiasPacket(gyro_bias_x=gyro_bias_x, gyro_bias_y=gyro_bias_y, gyro_bias_z=gyro_bias_z)

def decode_gps_broadcast(self, packet) -> UM7GPSPacket:
payload = packet[5:-2]
gps_latitude, gps_longitude, gps_altitude, gps_course, gps_speed, gps_time = struct.unpack('>ffffff', payload[0:24])
return UM7GPSPacket(
gps_latitude=gps_latitude, gps_longitude=gps_longitude, gps_altitude=gps_altitude, gps_course=gps_course, gps_speed=gps_speed, gps_time=gps_time
)

def decode_velocity_broadcast(self, packet) -> UM7VelocityPacket:
payload = packet[5:-2]
velocity_north, velocity_east, velocity_up, velocity_time = struct.unpack('>ffff', payload[0:16])
return UM7VelocityPacket(
velocity_north=velocity_north, velocity_east=velocity_east, velocity_up=velocity_up, velocity_time=velocity_time
)

def decode_health_broadcast(self, packet) -> UM7HealthPacket:
payload = packet[5:-2]
health, = struct.unpack('>I', payload[0:4])
return UM7HealthPacket(health=health)
return UM7HealthPacket(
health=health, sats_used=(health >> 26) & 0x3F, hdop=(health >> 16) & 0x7F, sats_in_view=(health >> 10) & 0x3F,
ovf=bool((health >> 8) & 0x01), mg_n=bool((health >> 5) & 0x01), acc_n=bool((health >> 4) & 0x01),
accel=bool((health >> 3) & 0x01), gyro=bool((health >> 2) & 0x01), mag=bool((health >> 1) & 0x01),
gps=bool((health >> 0) & 0x01)
)

def hidden_regs_values(self) -> List[Dict]:
hidden_regs_as_json = []
Expand Down