Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 20 additions & 41 deletions app/actions/edgetech/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ def _get_default_filters(self) -> Dict[str, Any]:
start_datetime = datetime.now(timezone.utc) - timedelta(minutes=30)
return {"start_datetime": start_datetime}

@staticmethod
def _utcnow() -> datetime:
"""Return the current UTC time. Extracted for testability."""
return datetime.now(timezone.utc)

def _remove_milliseconds(self, dt: datetime) -> datetime:
"""
Remove milliseconds from a datetime object.
Expand Down Expand Up @@ -90,12 +95,16 @@ async def _create_gear_payload(

last_updated = buoy.currentState.lastUpdated
last_deployed = buoy.currentState.dateDeployed or last_updated
# For initial deployments, use dateDeployed as recorded_at
# For updates (position changes), use lastUpdated since dateDeployed doesn't change
# For initial deployments, use dateDeployed as recorded_at.
# For updates (position changes), use current time so that recorded_at
# differs from previous syncs. EdgeTech can update latDeg/lonDeg
# without changing lastUpdated, so reusing lastUpdated as recorded_at
# could collide with a previously-accepted observation and be rejected
# by ER's (device_id, recorded_at) unique constraint.
if include_initial_deployment:
deployment_recorded_at = last_deployed or datetime.now(timezone.utc)
deployment_recorded_at = last_deployed or self._utcnow()
else:
deployment_recorded_at = last_updated or datetime.now(timezone.utc)
deployment_recorded_at = self._utcnow()

# Create devices list
devices = []
Expand All @@ -120,18 +129,11 @@ async def _create_gear_payload(
secondary_last_deployed = (
end_unit_buoy.currentState.dateDeployed or last_updated
)
end_unit_last_updated = (
end_unit_buoy.currentState.lastUpdated or last_updated
)
# Use same logic as main device for recorded_at
if include_initial_deployment:
secondary_recorded_at = secondary_last_deployed or datetime.now(
timezone.utc
)
secondary_recorded_at = secondary_last_deployed or self._utcnow()
else:
secondary_recorded_at = end_unit_last_updated or datetime.now(
timezone.utc
)
secondary_recorded_at = deployment_recorded_at
secondary_device_additional_data = json.loads(end_unit_buoy.json())
secondary_device_additional_data.pop("changeRecords", None)
elif end_unit_device_from_er:
Expand Down Expand Up @@ -665,34 +667,11 @@ async def _identify_buoys(
er_last_updated = er_gear.last_updated
has_newer_data = edgetech_last_updated > er_last_updated

# Check if recorded_at would be different - ER/Buoy rejects duplicates
# based on device_id + recorded_at unique constraint
# For updates, use lastUpdated (not dateDeployed) since position changes
# don't update dateDeployed - only re-deployments do
edgetech_recorded_at = self._remove_milliseconds(
edgetech_last_updated
)
er_device_last_deployed = None
for device in er_gear.devices:
if device.mfr_device_id in (
primary_subject_name,
standard_subject_name,
):
er_device_last_deployed = device.last_deployed
break

# If recorded_at would be the same as what's already in ER, skip update
# to avoid duplicate rejection
if er_device_last_deployed:
er_recorded_at = self._remove_milliseconds(
er_device_last_deployed
)
if edgetech_recorded_at == er_recorded_at:
logger.info(
f"Buoy {serial_number_user_id} skipped - recorded_at {edgetech_recorded_at} "
f"already exists in ER (would be rejected as duplicate)"
)
continue
# No recorded_at dedup check needed here: position-only
# updates now use _utcnow() as recorded_at, which makes
# collisions with previously-accepted observations unlikely,
# though updates processed within the same second could still
# collide after _remove_milliseconds truncation.

if location_changed or has_newer_data:
to_update.add(serial_number_user_id)
Expand Down
35 changes: 24 additions & 11 deletions app/actions/tests/test_edgetech_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1415,7 +1415,8 @@ async def test_identify_buoys_position_update_not_skipped_same_date_deployed(

This is the bug fix test: Previously, position updates were incorrectly skipped because
the code used dateDeployed for recorded_at, and dateDeployed doesn't change on position updates.
Now we use lastUpdated for updates, so position changes get a new recorded_at timestamp.
Now we use _utcnow() for updates, so position changes get a fresh recorded_at
that is unlikely to collide with previously-accepted observations.
"""
# Create EdgeTech buoy with:
# - dateDeployed: 22:40:08 (same as ER)
Expand Down Expand Up @@ -1498,14 +1499,22 @@ async def test_identify_buoys_position_update_not_skipped_same_date_deployed(
assert "skipped - recorded_at" not in caplog.text

@pytest.mark.asyncio
async def test_create_gear_payload_uses_last_updated_for_updates(self):
async def test_create_gear_payload_uses_current_time_for_update_recorded_at(
self, mocker
):
"""
Test that _create_gear_payload uses lastUpdated (not dateDeployed) for recorded_at
when include_initial_deployment=False (i.e., for updates).
Test that _create_gear_payload uses _utcnow() for recorded_at
when include_initial_deployment=False (i.e., for position updates).

This ensures position updates get a unique recorded_at timestamp.
EdgeTech can update latDeg/lonDeg without changing lastUpdated, so
reusing lastUpdated as recorded_at could collide with a previously-
accepted observation and be rejected by ER's (device_id, recorded_at)
unique constraint.
"""
fake_now = datetime(2026, 3, 20, 15, 30, 0, tzinfo=timezone.utc)

processor = EdgeTechProcessor(data=[], er_token="token", er_url="url")
mocker.patch.object(processor, "_utcnow", return_value=fake_now)

buoy_data = {
"serialNumber": "TEST123",
Expand Down Expand Up @@ -1551,12 +1560,12 @@ async def test_create_gear_payload_uses_last_updated_for_updates(self):
include_initial_deployment=False,
)

# For updates, recorded_at should be lastUpdated (the fix!)
# For updates, recorded_at should be the mocked current time
assert (
payload_update["devices"][0]["recorded_at"] == "2026-01-15T22:42:49+00:00"
payload_update["devices"][0]["recorded_at"] == "2026-03-20T15:30:00+00:00"
)

# Verify the timestamps are different
# Verify the timestamps are different from initial deployment
assert (
payload_initial["devices"][0]["recorded_at"]
!= payload_update["devices"][0]["recorded_at"]
Expand All @@ -1569,7 +1578,7 @@ async def test_position_update_end_to_end(self, mocker, caplog):

Scenario: A deployed gear has its position updated in EdgeTech, but dateDeployed
remains unchanged. The update should be processed and create a payload with
the new position and lastUpdated as recorded_at.
the new position and a fresh recorded_at (current time) to avoid collisions.
"""
# EdgeTech data with position change
edgetech_data = {
Expand Down Expand Up @@ -1625,6 +1634,9 @@ async def test_position_update_end_to_end(self, mocker, caplog):
mock_er_client.get_sources = AsyncMock(return_value=[])
processor._er_client = mock_er_client

fake_now = datetime(2026, 3, 20, 16, 0, 0, tzinfo=timezone.utc)
mocker.patch.object(processor, "_utcnow", return_value=fake_now)

with caplog.at_level(logging.INFO):
payloads = await processor.process()

Expand All @@ -1638,8 +1650,9 @@ async def test_position_update_end_to_end(self, mocker, caplog):
assert device["location"]["latitude"] == 41.0
assert device["location"]["longitude"] == -71.0

# Key assertion: recorded_at should be lastUpdated, not dateDeployed
assert device["recorded_at"] == "2026-01-15T12:00:00+00:00"
# Key assertion: recorded_at should be the mocked current time
# (not lastUpdated or dateDeployed) to avoid duplicate rejection
assert device["recorded_at"] == "2026-03-20T16:00:00+00:00"

# Verify logs show update (not skipped)
assert "marked for update" in caplog.text
Expand Down
Loading