Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion modules/communications/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def run(
) -> tuple[True, bytes, list[bytes]] | tuple[False, None, None]:

objects_in_world_global = []

for object_in_world in objects_in_world:
# We assume detected objects are on the ground
north = object_in_world.location_x
Expand Down Expand Up @@ -91,7 +92,7 @@ def run(
self.__logger.info(f"{time.time()}: {objects_in_world_global}")

encoded_position_global_objects = []
for object in object_in_world_global:
for object in objects_in_world_global:

result, message = message_encoding_decoding.encode_position_global(
worker_enum.WorkerEnum.COMMUNICATIONS_WORKER, object
Expand Down
302 changes: 302 additions & 0 deletions tests/unit/test_communications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
"""
Tests the communications class.
"""

import pytest

from modules.communications import communications
from modules.common.modules.logger import logger
from modules import object_in_world
from modules.common.modules import position_local
from modules.common.modules.mavlink import local_global_conversion
from modules.common.modules import position_global
from modules.common.modules.data_encoding import metadata_encoding_decoding
from modules.common.modules.data_encoding import message_encoding_decoding
from modules.common.modules.data_encoding.worker_enum import WorkerEnum

# Test functions use test fixture signature names and access class privates
# No enable
# pylint: disable=protected-access,redefined-outer-name

LATITUDE_TOLERANCE = 0.000001
LONGITUDE_TOLERANCE = 0.000001
ALTITUDE_TOLERANCE = 7


@pytest.fixture
def home_position() -> position_global.PositionGlobal: # type: ignore
"""
Home position.
"""
# University of Waterloo WGS84 Coordinate
result, position = position_global.PositionGlobal.create(43.472978, -80.540103, 336.0)
assert result
assert position is not None

yield position


@pytest.fixture
def communications_maker(
home_position: position_global.PositionGlobal,
) -> communications.Communications: # type: ignore
"""
Construct a Communications instance with the Home position
"""
result, test_logger = logger.Logger.create("test_logger", False)

assert result
assert test_logger is not None

result, communications_instance = communications.Communications.create(
home_position, test_logger
)
assert result
assert communications_instance is not None

yield communications_instance # type: ignore


def object_in_world_from_position_local(
position_local: position_local.PositionLocal,
) -> object_in_world.ObjectInWorld:
"""
Convert position local to object_in_world as defined in Communications.py
"""
result, obj = object_in_world.ObjectInWorld.create(
position_local.north, position_local.east, 0.0
)
assert result
assert obj is not None

return obj


def assert_global_positions(
expected: position_global.PositionGlobal, actual: position_global.PositionGlobal
) -> None:
"""
Assert each values of the global positions using the Tolerances
"""
assert abs(expected.latitude - actual.latitude) < LATITUDE_TOLERANCE
assert abs(expected.longitude - actual.longitude) < LONGITUDE_TOLERANCE
assert abs(expected.altitude - actual.altitude) < ALTITUDE_TOLERANCE


class TestCommunications:
"""
Tests for the Communications.run() method.
"""

def test_run(
self,
home_position: position_global.PositionGlobal,
communications_maker: communications.Communications,
) -> None:
"""
Test if the Communications.run returns the correct instance
"""
# Setup
result, position = position_global.PositionGlobal.create(43.472978, -80.540103, 336.0)
assert result
assert position is not None

result, actual = local_global_conversion.position_local_from_position_global(
home_position, position
)
assert result
assert actual is not None

objects_in_world = [object_in_world_from_position_local(actual)]

# Run
result, metadata, generated_objects = communications_maker.run(objects_in_world)

# Test
assert result
assert isinstance(metadata, bytes)
assert all(isinstance(obj, bytes) for obj in generated_objects)

def test_normal(
self,
home_position: position_global.PositionGlobal,
communications_maker: communications.Communications,
) -> None:
"""
Normal
"""
# Setup
result, global_position_1 = position_global.PositionGlobal.create(
43.472978, -80.540103, 336.0
)
assert result
assert global_position_1 is not None

result, local_position_1 = local_global_conversion.position_local_from_position_global(
home_position, global_position_1
)
assert result
assert local_position_1 is not None

result, global_position_2 = position_global.PositionGlobal.create(
43.472800, -80.539500, 330.0
)
assert result
assert global_position_2 is not None

result, local_position_2 = local_global_conversion.position_local_from_position_global(
home_position, global_position_2
)
assert result
assert local_position_2 is not None

global_positions = [global_position_1, global_position_2]

objects_in_world = [
object_in_world_from_position_local(local_position_1),
object_in_world_from_position_local(local_position_2),
]
number_of_messages = len(objects_in_world)

# Run
result, metadata, generated_objects = communications_maker.run(objects_in_world)
assert result
assert isinstance(metadata, bytes)
assert all(isinstance(obj, bytes) for obj in generated_objects)

result, worker_id, actual_number_of_messages = metadata_encoding_decoding.decode_metadata(
metadata
)
assert result
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER

# Test
assert actual_number_of_messages == number_of_messages

# Conversion
for i, global_position in enumerate(global_positions):
result, worker_id, actual = message_encoding_decoding.decode_bytes_to_position_global(
generated_objects[i]
)
assert result
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER

assert_global_positions(global_position, actual)

def test_empty_objects(
self,
communications_maker: communications.Communications,
) -> None:
"""
When nothing is passed in
"""
objects_in_world = []

result, metadata, generated_objects = communications_maker.run(objects_in_world)
assert result
assert isinstance(metadata, bytes)
assert all(isinstance(obj, bytes) for obj in generated_objects)

result, worker_id, actual_number_of_messages = metadata_encoding_decoding.decode_metadata(
metadata
)

assert result
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER
# Test
assert actual_number_of_messages == 0
assert len(generated_objects) == 0

def test_same_as_home(
self,
home_position: position_global.PositionGlobal,
communications_maker: communications.Communications,
) -> None:
"""
When the objects_in_world contains the home positions
"""
# Setup
result, local_position = local_global_conversion.position_local_from_position_global(
home_position, home_position
)
assert result
assert local_position is not None

actual = object_in_world_from_position_local(local_position)
objects_in_world = [actual]
number_of_messages = len(objects_in_world)

# Run
result, metadata, generated_objects = communications_maker.run(objects_in_world)
assert result
assert isinstance(metadata, bytes)
assert all(isinstance(obj, bytes) for obj in generated_objects)
# Conversion
result, worker_id, actual_number_of_messages = metadata_encoding_decoding.decode_metadata(
metadata
)
assert result
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER

# Test
assert actual_number_of_messages == number_of_messages

# Conversion
result, worker_id, actual = message_encoding_decoding.decode_bytes_to_position_global(
generated_objects[0]
)
assert result
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER

# Test
assert_global_positions(home_position, actual)

def test_duplicate_coordinates(
self,
home_position: position_global.PositionGlobal,
communications_maker: communications.Communications,
) -> None:
"""
When the objects_in_world contains duplicate positions
"""
# Setup
result, global_position = position_global.PositionGlobal.create(
43.472978, -80.540103, 336.0
)
assert result
assert global_position is not None

result, local_position = local_global_conversion.position_local_from_position_global(
home_position, global_position
)
assert result
assert local_position is not None

position = object_in_world_from_position_local(local_position)

objects_in_world = [position, position, position]
number_of_messages = len(objects_in_world)

# Run
result, metadata, generated_objects = communications_maker.run(objects_in_world)
assert result
assert isinstance(metadata, bytes)
assert all(isinstance(obj, bytes) for obj in generated_objects)

result, worker_id, actual_number_of_messages = metadata_encoding_decoding.decode_metadata(
metadata
)
assert result
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER
# Test
assert actual_number_of_messages == number_of_messages

for generated_object in generated_objects:
# Conversion
result, worker_id, actual = message_encoding_decoding.decode_bytes_to_position_global(
generated_object
)
assert result
assert worker_id == WorkerEnum.COMMUNICATIONS_WORKER
# Test
assert_global_positions(global_position, actual)