From f064dc574c8a57699547faa10736ad6d9a182e40 Mon Sep 17 00:00:00 2001 From: chrisj-er <“chrisj@earthranger.com”> Date: Fri, 20 Mar 2026 16:57:49 -0700 Subject: [PATCH 1/4] fix: position updates not always getting sent --- app/actions/edgetech/processor.py | 41 +++++-------------- app/actions/tests/test_edgetech_processor.py | 43 ++++++++++++++------ 2 files changed, 41 insertions(+), 43 deletions(-) diff --git a/app/actions/edgetech/processor.py b/app/actions/edgetech/processor.py index bb72ddd0..93363551 100644 --- a/app/actions/edgetech/processor.py +++ b/app/actions/edgetech/processor.py @@ -90,12 +90,16 @@ async def _create_gear_payload( last_updated = buoy.currentState.lastUpdated last_deployed = buoy.currentState.dateDeployed or last_updated - # For initial deployments, use dateDeployed as recorded_at - # For updates (position changes), use lastUpdated since dateDeployed doesn't change + # For initial deployments, use dateDeployed as recorded_at. + # For updates (position changes), use current time so the recorded_at is + # always unique. EdgeTech can update latDeg/lonDeg without changing + # lastUpdated, so reusing lastUpdated as recorded_at would collide with + # a previously-accepted observation and be rejected by ER's + # (device_id, recorded_at) unique constraint. if include_initial_deployment: deployment_recorded_at = last_deployed or datetime.now(timezone.utc) else: - deployment_recorded_at = last_updated or datetime.now(timezone.utc) + deployment_recorded_at = datetime.now(timezone.utc) # Create devices list devices = [] @@ -665,34 +669,9 @@ async def _identify_buoys( er_last_updated = er_gear.last_updated has_newer_data = edgetech_last_updated > er_last_updated - # Check if recorded_at would be different - ER/Buoy rejects duplicates - # based on device_id + recorded_at unique constraint - # For updates, use lastUpdated (not dateDeployed) since position changes - # don't update dateDeployed - only re-deployments do - edgetech_recorded_at = self._remove_milliseconds( - edgetech_last_updated - ) - er_device_last_deployed = None - for device in er_gear.devices: - if device.mfr_device_id in ( - primary_subject_name, - standard_subject_name, - ): - er_device_last_deployed = device.last_deployed - break - - # If recorded_at would be the same as what's already in ER, skip update - # to avoid duplicate rejection - if er_device_last_deployed: - er_recorded_at = self._remove_milliseconds( - er_device_last_deployed - ) - if edgetech_recorded_at == er_recorded_at: - logger.info( - f"Buoy {serial_number_user_id} skipped - recorded_at {edgetech_recorded_at} " - f"already exists in ER (would be rejected as duplicate)" - ) - continue + # No recorded_at dedup check needed here: position-only + # updates now use datetime.now(utc) as recorded_at, so they + # are always unique and will not be rejected by ER. if location_changed or has_newer_data: to_update.add(serial_number_user_id) diff --git a/app/actions/tests/test_edgetech_processor.py b/app/actions/tests/test_edgetech_processor.py index 9e1e5258..e98049be 100644 --- a/app/actions/tests/test_edgetech_processor.py +++ b/app/actions/tests/test_edgetech_processor.py @@ -1415,7 +1415,7 @@ async def test_identify_buoys_position_update_not_skipped_same_date_deployed( This is the bug fix test: Previously, position updates were incorrectly skipped because the code used dateDeployed for recorded_at, and dateDeployed doesn't change on position updates. - Now we use lastUpdated for updates, so position changes get a new recorded_at timestamp. + Now we use datetime.now(utc) for updates, so position changes always get a unique recorded_at. """ # Create EdgeTech buoy with: # - dateDeployed: 22:40:08 (same as ER) @@ -1498,12 +1498,15 @@ async def test_identify_buoys_position_update_not_skipped_same_date_deployed( assert "skipped - recorded_at" not in caplog.text @pytest.mark.asyncio - async def test_create_gear_payload_uses_last_updated_for_updates(self): + async def test_create_gear_payload_uses_current_time_for_update_recorded_at(self): """ - Test that _create_gear_payload uses lastUpdated (not dateDeployed) for recorded_at - when include_initial_deployment=False (i.e., for updates). + Test that _create_gear_payload uses datetime.now(utc) for recorded_at + when include_initial_deployment=False (i.e., for position updates). - This ensures position updates get a unique recorded_at timestamp. + EdgeTech can update latDeg/lonDeg without changing lastUpdated, so + reusing lastUpdated as recorded_at would collide with a previously- + accepted observation and be rejected by ER's (device_id, recorded_at) + unique constraint. Using current time guarantees uniqueness. """ processor = EdgeTechProcessor(data=[], er_token="token", er_url="url") @@ -1551,12 +1554,20 @@ async def test_create_gear_payload_uses_last_updated_for_updates(self): include_initial_deployment=False, ) - # For updates, recorded_at should be lastUpdated (the fix!) - assert ( - payload_update["devices"][0]["recorded_at"] == "2026-01-15T22:42:49+00:00" + # For updates, recorded_at should be current time (not lastUpdated) + update_recorded_at = datetime.fromisoformat( + payload_update["devices"][0]["recorded_at"] + ) + # Should be close to now (within 5 seconds) and NOT equal to lastUpdated or dateDeployed + assert update_recorded_at != datetime( + 2026, 1, 15, 22, 42, 49, tzinfo=timezone.utc + ) + assert update_recorded_at != datetime( + 2026, 1, 15, 22, 40, 8, tzinfo=timezone.utc ) + assert (datetime.now(timezone.utc) - update_recorded_at).total_seconds() < 5 - # Verify the timestamps are different + # Verify the timestamps are different from initial deployment assert ( payload_initial["devices"][0]["recorded_at"] != payload_update["devices"][0]["recorded_at"] @@ -1569,7 +1580,7 @@ async def test_position_update_end_to_end(self, mocker, caplog): Scenario: A deployed gear has its position updated in EdgeTech, but dateDeployed remains unchanged. The update should be processed and create a payload with - the new position and lastUpdated as recorded_at. + the new position and a unique recorded_at (current time). """ # EdgeTech data with position change edgetech_data = { @@ -1638,8 +1649,16 @@ async def test_position_update_end_to_end(self, mocker, caplog): assert device["location"]["latitude"] == 41.0 assert device["location"]["longitude"] == -71.0 - # Key assertion: recorded_at should be lastUpdated, not dateDeployed - assert device["recorded_at"] == "2026-01-15T12:00:00+00:00" + # Key assertion: recorded_at should be current time (not lastUpdated or dateDeployed) + # to avoid duplicate rejection when EdgeTech updates location without changing lastUpdated + update_recorded_at = datetime.fromisoformat(device["recorded_at"]) + assert update_recorded_at != datetime( + 2026, 1, 15, 12, 0, 0, tzinfo=timezone.utc + ) + assert update_recorded_at != datetime( + 2026, 1, 15, 10, 0, 0, tzinfo=timezone.utc + ) + assert (datetime.now(timezone.utc) - update_recorded_at).total_seconds() < 5 # Verify logs show update (not skipped) assert "marked for update" in caplog.text From 7573a7de83fc803a35faf278c8aaa2f2c82f1014 Mon Sep 17 00:00:00 2001 From: chrisj-er <“chrisj@earthranger.com”> Date: Fri, 20 Mar 2026 17:14:47 -0700 Subject: [PATCH 2/4] address PR reviews, cleanup code --- app/actions/edgetech/processor.py | 25 ++++++----- app/actions/tests/test_edgetech_processor.py | 46 +++++++++----------- 2 files changed, 34 insertions(+), 37 deletions(-) diff --git a/app/actions/edgetech/processor.py b/app/actions/edgetech/processor.py index 93363551..17ab5634 100644 --- a/app/actions/edgetech/processor.py +++ b/app/actions/edgetech/processor.py @@ -50,6 +50,11 @@ def _get_default_filters(self) -> Dict[str, Any]: start_datetime = datetime.now(timezone.utc) - timedelta(minutes=30) return {"start_datetime": start_datetime} + @staticmethod + def _utcnow() -> datetime: + """Return the current UTC time. Extracted for testability.""" + return datetime.now(timezone.utc) + def _remove_milliseconds(self, dt: datetime) -> datetime: """ Remove milliseconds from a datetime object. @@ -91,15 +96,15 @@ async def _create_gear_payload( last_updated = buoy.currentState.lastUpdated last_deployed = buoy.currentState.dateDeployed or last_updated # For initial deployments, use dateDeployed as recorded_at. - # For updates (position changes), use current time so the recorded_at is - # always unique. EdgeTech can update latDeg/lonDeg without changing - # lastUpdated, so reusing lastUpdated as recorded_at would collide with - # a previously-accepted observation and be rejected by ER's - # (device_id, recorded_at) unique constraint. + # For updates (position changes), use current time so that recorded_at + # differs from previous syncs. EdgeTech can update latDeg/lonDeg + # without changing lastUpdated, so reusing lastUpdated as recorded_at + # could collide with a previously-accepted observation and be rejected + # by ER's (device_id, recorded_at) unique constraint. if include_initial_deployment: - deployment_recorded_at = last_deployed or datetime.now(timezone.utc) + deployment_recorded_at = last_deployed or self._utcnow() else: - deployment_recorded_at = datetime.now(timezone.utc) + deployment_recorded_at = self._utcnow() # Create devices list devices = [] @@ -133,9 +138,7 @@ async def _create_gear_payload( timezone.utc ) else: - secondary_recorded_at = end_unit_last_updated or datetime.now( - timezone.utc - ) + secondary_recorded_at = deployment_recorded_at secondary_device_additional_data = json.loads(end_unit_buoy.json()) secondary_device_additional_data.pop("changeRecords", None) elif end_unit_device_from_er: @@ -671,7 +674,7 @@ async def _identify_buoys( # No recorded_at dedup check needed here: position-only # updates now use datetime.now(utc) as recorded_at, so they - # are always unique and will not be rejected by ER. + # will not collide with previously-accepted observations. if location_changed or has_newer_data: to_update.add(serial_number_user_id) diff --git a/app/actions/tests/test_edgetech_processor.py b/app/actions/tests/test_edgetech_processor.py index e98049be..18689bef 100644 --- a/app/actions/tests/test_edgetech_processor.py +++ b/app/actions/tests/test_edgetech_processor.py @@ -1415,7 +1415,8 @@ async def test_identify_buoys_position_update_not_skipped_same_date_deployed( This is the bug fix test: Previously, position updates were incorrectly skipped because the code used dateDeployed for recorded_at, and dateDeployed doesn't change on position updates. - Now we use datetime.now(utc) for updates, so position changes always get a unique recorded_at. + Now we use datetime.now(utc) for updates, so position changes get a fresh recorded_at + that won't collide with previously-accepted observations. """ # Create EdgeTech buoy with: # - dateDeployed: 22:40:08 (same as ER) @@ -1498,17 +1499,22 @@ async def test_identify_buoys_position_update_not_skipped_same_date_deployed( assert "skipped - recorded_at" not in caplog.text @pytest.mark.asyncio - async def test_create_gear_payload_uses_current_time_for_update_recorded_at(self): + async def test_create_gear_payload_uses_current_time_for_update_recorded_at( + self, mocker + ): """ Test that _create_gear_payload uses datetime.now(utc) for recorded_at when include_initial_deployment=False (i.e., for position updates). EdgeTech can update latDeg/lonDeg without changing lastUpdated, so - reusing lastUpdated as recorded_at would collide with a previously- + reusing lastUpdated as recorded_at could collide with a previously- accepted observation and be rejected by ER's (device_id, recorded_at) - unique constraint. Using current time guarantees uniqueness. + unique constraint. """ + fake_now = datetime(2026, 3, 20, 15, 30, 0, tzinfo=timezone.utc) + processor = EdgeTechProcessor(data=[], er_token="token", er_url="url") + mocker.patch.object(processor, "_utcnow", return_value=fake_now) buoy_data = { "serialNumber": "TEST123", @@ -1554,18 +1560,10 @@ async def test_create_gear_payload_uses_current_time_for_update_recorded_at(self include_initial_deployment=False, ) - # For updates, recorded_at should be current time (not lastUpdated) - update_recorded_at = datetime.fromisoformat( - payload_update["devices"][0]["recorded_at"] - ) - # Should be close to now (within 5 seconds) and NOT equal to lastUpdated or dateDeployed - assert update_recorded_at != datetime( - 2026, 1, 15, 22, 42, 49, tzinfo=timezone.utc - ) - assert update_recorded_at != datetime( - 2026, 1, 15, 22, 40, 8, tzinfo=timezone.utc + # For updates, recorded_at should be the mocked current time + assert ( + payload_update["devices"][0]["recorded_at"] == "2026-03-20T15:30:00+00:00" ) - assert (datetime.now(timezone.utc) - update_recorded_at).total_seconds() < 5 # Verify the timestamps are different from initial deployment assert ( @@ -1580,7 +1578,7 @@ async def test_position_update_end_to_end(self, mocker, caplog): Scenario: A deployed gear has its position updated in EdgeTech, but dateDeployed remains unchanged. The update should be processed and create a payload with - the new position and a unique recorded_at (current time). + the new position and a fresh recorded_at (current time) to avoid collisions. """ # EdgeTech data with position change edgetech_data = { @@ -1636,6 +1634,9 @@ async def test_position_update_end_to_end(self, mocker, caplog): mock_er_client.get_sources = AsyncMock(return_value=[]) processor._er_client = mock_er_client + fake_now = datetime(2026, 3, 20, 16, 0, 0, tzinfo=timezone.utc) + mocker.patch.object(processor, "_utcnow", return_value=fake_now) + with caplog.at_level(logging.INFO): payloads = await processor.process() @@ -1649,16 +1650,9 @@ async def test_position_update_end_to_end(self, mocker, caplog): assert device["location"]["latitude"] == 41.0 assert device["location"]["longitude"] == -71.0 - # Key assertion: recorded_at should be current time (not lastUpdated or dateDeployed) - # to avoid duplicate rejection when EdgeTech updates location without changing lastUpdated - update_recorded_at = datetime.fromisoformat(device["recorded_at"]) - assert update_recorded_at != datetime( - 2026, 1, 15, 12, 0, 0, tzinfo=timezone.utc - ) - assert update_recorded_at != datetime( - 2026, 1, 15, 10, 0, 0, tzinfo=timezone.utc - ) - assert (datetime.now(timezone.utc) - update_recorded_at).total_seconds() < 5 + # Key assertion: recorded_at should be the mocked current time + # (not lastUpdated or dateDeployed) to avoid duplicate rejection + assert device["recorded_at"] == "2026-03-20T16:00:00+00:00" # Verify logs show update (not skipped) assert "marked for update" in caplog.text From 4feaaf0d70d16bc7c59a39a005b9021239788274 Mon Sep 17 00:00:00 2001 From: chrisj-er <“chrisj@earthranger.com”> Date: Fri, 20 Mar 2026 21:00:53 -0700 Subject: [PATCH 3/4] address PR reviews, cleanup code --- app/actions/edgetech/processor.py | 9 ++++----- app/actions/tests/test_edgetech_processor.py | 6 +++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/app/actions/edgetech/processor.py b/app/actions/edgetech/processor.py index 17ab5634..7e5bcb9d 100644 --- a/app/actions/edgetech/processor.py +++ b/app/actions/edgetech/processor.py @@ -129,9 +129,6 @@ async def _create_gear_payload( secondary_last_deployed = ( end_unit_buoy.currentState.dateDeployed or last_updated ) - end_unit_last_updated = ( - end_unit_buoy.currentState.lastUpdated or last_updated - ) # Use same logic as main device for recorded_at if include_initial_deployment: secondary_recorded_at = secondary_last_deployed or datetime.now( @@ -673,8 +670,10 @@ async def _identify_buoys( has_newer_data = edgetech_last_updated > er_last_updated # No recorded_at dedup check needed here: position-only - # updates now use datetime.now(utc) as recorded_at, so they - # will not collide with previously-accepted observations. + # updates now use _utcnow() as recorded_at, which makes + # collisions with previously-accepted observations unlikely, + # though updates processed within the same second could still + # collide after _remove_milliseconds truncation. if location_changed or has_newer_data: to_update.add(serial_number_user_id) diff --git a/app/actions/tests/test_edgetech_processor.py b/app/actions/tests/test_edgetech_processor.py index 18689bef..1c0857b0 100644 --- a/app/actions/tests/test_edgetech_processor.py +++ b/app/actions/tests/test_edgetech_processor.py @@ -1415,8 +1415,8 @@ async def test_identify_buoys_position_update_not_skipped_same_date_deployed( This is the bug fix test: Previously, position updates were incorrectly skipped because the code used dateDeployed for recorded_at, and dateDeployed doesn't change on position updates. - Now we use datetime.now(utc) for updates, so position changes get a fresh recorded_at - that won't collide with previously-accepted observations. + Now we use _utcnow() for updates, so position changes get a fresh recorded_at + that is unlikely to collide with previously-accepted observations. """ # Create EdgeTech buoy with: # - dateDeployed: 22:40:08 (same as ER) @@ -1503,7 +1503,7 @@ async def test_create_gear_payload_uses_current_time_for_update_recorded_at( self, mocker ): """ - Test that _create_gear_payload uses datetime.now(utc) for recorded_at + Test that _create_gear_payload uses _utcnow() for recorded_at when include_initial_deployment=False (i.e., for position updates). EdgeTech can update latDeg/lonDeg without changing lastUpdated, so From 6e47ab64e41a1924e09d188f503d9364822e4c3b Mon Sep 17 00:00:00 2001 From: chrisj-er <“chrisj@earthranger.com”> Date: Fri, 20 Mar 2026 21:13:56 -0700 Subject: [PATCH 4/4] address PR reviews, cleanup code --- app/actions/edgetech/processor.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/app/actions/edgetech/processor.py b/app/actions/edgetech/processor.py index 7e5bcb9d..4b350e72 100644 --- a/app/actions/edgetech/processor.py +++ b/app/actions/edgetech/processor.py @@ -131,9 +131,7 @@ async def _create_gear_payload( ) # Use same logic as main device for recorded_at if include_initial_deployment: - secondary_recorded_at = secondary_last_deployed or datetime.now( - timezone.utc - ) + secondary_recorded_at = secondary_last_deployed or self._utcnow() else: secondary_recorded_at = deployment_recorded_at secondary_device_additional_data = json.loads(end_unit_buoy.json())