Skip to content
25 changes: 19 additions & 6 deletions EosLib/packet/data_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from EosLib.device import Device

class DataHeader:

data_header_struct_format_string = "!" \
"B" \
"B" \
Expand Down Expand Up @@ -50,17 +49,18 @@ def validate_data_header(self):

:return: True if valid
"""
if not isinstance(self.sender, int) or not 0 <= self.sender <= 255 or self.sender == \
Device.NO_DEVICE:

if not isinstance(self.sender, int) or self.sender == definitions.Device.NO_DEVICE or \
not self.sender in definitions.Device:
raise DataHeaderFormatError("Invalid Sender")

if not isinstance(self.data_type, int) or not 0 <= self.data_type <= 255:
if not isinstance(self.data_type, int) or self.data_type not in definitions.Type:
raise DataHeaderFormatError("Invalid Type")

if not isinstance(self.priority, int) or not 0 <= self.priority <= 255:
if not isinstance(self.priority, int) or self.priority not in definitions.Priority:
raise DataHeaderFormatError("Invalid Priority")

if not isinstance(self.destination, int) or not 0 <= self.destination <= 255:
if not isinstance(self.destination, int) or self.destination not in definitions.Device:
raise DataHeaderFormatError("Invalid Destination")

if not isinstance(self.generate_time, datetime):
Expand Down Expand Up @@ -111,3 +111,16 @@ def decode(header_bytes: bytes):
decoded_header = DataHeader(unpacked[1], unpacked[2], unpacked[3], unpacked[4],
datetime.fromtimestamp(unpacked[5]))
return decoded_header


def check_data_header(data_header: DataHeader) -> bool:
""" Takes a packet data header and checks to see if it is valid

:return: boolean True if valid
"""
if data_header is None:
raise PacketFormatError("All packets must have a data header")
else:
data_header.validate_data_header()

return True
56 changes: 53 additions & 3 deletions EosLib/packet/definitions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
from enum import IntEnum, unique
from enum import IntEnum, unique, EnumMeta


class EnumMetaClass(EnumMeta):
def __contains__(cls, item):
try:
cls(item)
except ValueError:
return False
else:
return True


@unique
class Type(IntEnum):
class Type(IntEnum, metaclass=EnumMetaClass):
NO_TYPE = 0
TELEMETRY = 1
WARNING = 2
Expand All @@ -15,7 +25,7 @@ class Type(IntEnum):


@unique
class Priority(IntEnum):
class Priority(IntEnum, metaclass=EnumMetaClass):
NO_TRANSMIT = 0
URGENT = 11
TELEMETRY = 9
Expand All @@ -25,6 +35,46 @@ class Priority(IntEnum):
ERROR = 255


@unique
class Device(IntEnum, metaclass=EnumMetaClass):
NO_DEVICE = 0
PRESSURE = 1
PARTICULATES = 2
IR_VISIBLE_LIGHT = 3
VISIBLE_UVA_LIGHT = 4
UVA_UVB_LIGHT = 5
CO2 = 6
O3 = 7
MISC_SENSOR_1 = 8
MISC_SENSOR_2 = 9
MISC_SENSOR_3 = 10
MISC_SENSOR_4 = 11
REEFING_MOTOR = 12
MISC_ENGINEERING_1 = 13
MISC_ENGINEERING_2 = 14
CAMERA_1 = 15
CAMERA_2 = 16
MISC_CAMERA_1 = 17
MISC_CAMERA_2 = 18
RADIO = 19
MISC_RADIO_1 = 20
MISC_RADIO_2 = 21
GPS = 22
IMU = 23
MISC_1 = 24
MISC_2 = 25
MISC_3 = 26
MISC_4 = 27
TEMPERATURE_HUMIDITY = 28
MISC_TEST_1 = 29
MISC_TEST_2 = 30
MISC_TEST_3 = 31
GROUND_STATION_1 = 32
GROUND_STATION_2 = 33
GROUND_STATION_3 = 34
ORCHEOSTRATOR = 35


@unique
class HeaderPreamble(IntEnum):
V010DATA = 2
Expand Down
50 changes: 50 additions & 0 deletions EosLib/packet/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, body: bytes, data_header: DataHeader, transmit_header: Transm
self.body = body # type: bytes
self.data_header = data_header # type: DataHeader
self.transmit_header = transmit_header # type: TransmitHeader
self.validate_packet() # checks if packet is valid

def __eq__(self, other):
""" Compares two packets for value equality
Expand Down Expand Up @@ -129,6 +130,55 @@ def encode_to_string(self):
data_header=self.data_header.encode_to_string(),
body=self.body.decode())

def set_data_header(self, new_data_header: DataHeader) -> bool:
""" Setter that sets new data header

:return: boolean True set is successful
:raises: PacketFormatError if data header is none
"""
if new_data_header is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is very incorrect

if new_data_header.validate_data_header():
self.data_header = new_data_header
return True

raise PacketFormatError("data header can not be none")

def set_transmit_header(self, new_transmit_header: TransmitHeader) -> bool:
""" Setter that sets new transmit header

:return: boolean True set is successful
"""
if new_transmit_header is None:
self.transmit_header = new_transmit_header
return True

if new_transmit_header.validate_transmit_header():
self.transmit_header = new_transmit_header

return True

def set_body(self, new_body: bytes) -> bool:
""" Setter that sets new body

:return: boolean True set is successful
"""
if new_body is not None and len(new_body) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the body validator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to make a function or is there already a function for this? If so I can't find it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth making one, there's good logic in validate_packet.

Also this function's logic is wrong (only sets if length 0) and it doesn't return something even though the docstring specifies that it should

self.body = new_body
return True

return False

@staticmethod
def check_body(body: bytes) -> bool:
""" Takes a packet body and checks to see if it is valid

:return: boolean True if valid
"""
if body is None or len(body) == 0 or not isinstance(body, bytes):
raise PacketFormatError("All packets must have a body")

return True

@staticmethod
def decode(packet_bytes: bytes):
"""Takes a bytes object and decodes it into a Packet object.
Expand Down
11 changes: 11 additions & 0 deletions EosLib/packet/transmit_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,14 @@ def decode(header_bytes: bytes):

decoded_header = TransmitHeader(unpacked[1], datetime.fromtimestamp(unpacked[3]), unpacked[2])
return decoded_header


def check_transmit_header(transmit_header: TransmitHeader) -> bool:
""" Takes a packet transmit header and checks to see if it is valid

:return: boolean True if valid
"""
if transmit_header is not None:
transmit_header.validate_transmit_header()

return True
42 changes: 41 additions & 1 deletion tests/packet/test_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from datetime import datetime
from EosLib.packet.packet import TransmitHeader, DataHeader, Packet, PacketFormatError
from EosLib.packet.exceptions import DataHeaderFormatError, TransmitHeaderFormatError
from EosLib.packet.data_header import check_data_header
from EosLib.packet.transmit_header import check_transmit_header
from EosLib.device import Device


Expand Down Expand Up @@ -294,4 +296,42 @@ def test_packet_print_no_body(packet):
"\tGenerate Time: 2001-01-07 01:23:45\n" \
"No body"

assert expected_string == packet.__str__()

def test_set_data_header():
test_packet = get_valid_packet()
data_header = DataHeader(definitions.Device.GPS,
definitions.Type.TELEMETRY,
definitions.Priority.TELEMETRY,
definitions.Device.GPS,
datetime.now())

test_packet.set_data_header(data_header)


def test_set_transmit_header():
test_packet = get_valid_packet()
transmit_header = TransmitHeader(0, datetime.now())

test_packet.set_transmit_header(transmit_header)


def test_set_body():
test_packet = get_valid_packet()
body = bytes("temp", 'utf-8')

test_packet.set_body(body)


def test_check_data_header():
test_packet = get_valid_packet()
check_data_header(test_packet.data_header)


def test_check_transmit_header():
test_packet = get_valid_packet()
check_transmit_header(test_packet.transmit_header)


def test_check_body():
test_packet = get_valid_packet()
Packet.check_body(test_packet.body)