From e39ec2d54d83501347b0c1f252ac43f7ac27c0ae Mon Sep 17 00:00:00 2001 From: chrisj-er <“chrisj@earthranger.com”> Date: Sat, 28 Mar 2026 09:38:34 -0700 Subject: [PATCH 1/6] Summary of the fix: - processor.py:233-252: Added is_redeployment parameter to _create_haul_payload - processor.py:280-284: When is_redeployment=True and no dateRecovered is found, the method keeps datetime.now(timezone.utc) as recorded_at instead of falling back to lastUpdated (which would truncate to the same second as dateDeployed) - processor.py:855: Pass is_redeployment=serial_number_user_id in to_deploy when calling _create_haul_payload --- README.md | 6 +- app/actions/edgetech/processor.py | 20 +++- app/actions/tests/test_edgetech_processor.py | 97 ++++++++++++++++++++ 3 files changed, 118 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index c4678d53..6da0567c 100644 --- a/README.md +++ b/README.md @@ -365,7 +365,9 @@ After filtering, we compare EdgeTech data with our existing Earth Ranger records - Action: **Haul** the existing gear (close it), then **Deploy** a new gear set with a new `set_id` - **Processing order**: Haul payload is sent first, then the new deployment payload, so the previous gear is closed before the new one is created - **Why**: When the same serial(s) are deployed again (e.g. same two-unit pair with a new `dateDeployed`), we close the previous deployment in ER/Buoy and create a new one instead of updating the old gear in place -- **Haul timestamp for re-deployments**: When a buoy is hauled and redeployed within seconds, the `currentState.dateRecovered` is cleared by the redeploy. In this case, the haul payload’s `recorded_at` is sourced from the most recent `dateRecovered` in `changeRecords` to avoid colliding with the deploy’s `recorded_at` (which uses `dateDeployed`). Recovery location (`recoveredLatDeg`/`recoveredLonDeg`) is also recovered from `changeRecords` in the same way. +- **Haul timestamp for re-deployments**: The haul payload’s `recorded_at` must not collide with the deploy payload’s `recorded_at` (which uses `dateDeployed`), since both share the same `device_id` and ER enforces a `(device_id, recorded_at)` unique constraint. Timestamps are truncated to seconds (`_remove_milliseconds`), so values within the same second will collide. + - **With `dateRecovered` in `changeRecords`**: When a buoy is hauled and redeployed within seconds, `currentState.dateRecovered` is cleared by the redeploy. The haul `recorded_at` is sourced from the most recent `dateRecovered` in `changeRecords`. Recovery location (`recoveredLatDeg`/`recoveredLonDeg`) is also recovered from `changeRecords`. + - **Without `dateRecovered` (skipped haul)**: EdgeTech may skip the haul stage entirely — the gear goes straight from one deployment to another with no `dateRecovered` anywhere. In this case, `lastUpdated` and `dateDeployed` are typically within the same second (both reflect the new deployment), so the haul uses `datetime.now(UTC)` as `recorded_at` instead of falling back to `lastUpdated`. **2a. DEPLOY (Recovery from missed deployment)** - Buoy exists in **both** EdgeTech and Earth Ranger, but ER gear `status` is **not** `"deployed"` (e.g. `"hauled"`) @@ -1134,7 +1136,7 @@ This integration provides robust synchronization between EdgeTech's Trap Tracker ✅ **Efficient database dump** mechanism for bulk data retrieval ✅ **Intelligent filtering** to process active and explicitly hauled buoys ✅ **Explicit status-based haul detection** (not inferred from absence) -✅ **Re-deployment handling**: when EdgeTech `dateDeployed` is meaningfully later than ER’s deployment, we close the previous gear and create a new one (hauls sent before new deployments). Haul timestamps and recovery locations are sourced from `changeRecords` when a rapid haul+redeploy clears `currentState`. +✅ **Re-deployment handling**: when EdgeTech `dateDeployed` is meaningfully later than ER’s deployment, we close the previous gear and create a new one (hauls sent before new deployments). Haul timestamps and recovery locations are sourced from `changeRecords` when a rapid haul+redeploy clears `currentState`. When EdgeTech skips the haul stage entirely (no `dateRecovered` anywhere), the synthetic haul uses `datetime.now(UTC)` to avoid `recorded_at` collisions with the deploy payload. ✅ **Recovery from missed deployments**: if a re-deployment haul succeeded but the deploy failed, subsequent runs detect the hauled ER gear vs deployed EdgeTech state and create the missing deployment ✅ **Set ID resolution** to correctly update existing vs create new gear sets ✅ **Support for complex systems** including two-unit lines diff --git a/app/actions/edgetech/processor.py b/app/actions/edgetech/processor.py index 4b350e72..eae6a395 100644 --- a/app/actions/edgetech/processor.py +++ b/app/actions/edgetech/processor.py @@ -231,7 +231,10 @@ async def _create_gear_payload( return payload def _create_haul_payload( - self, er_gear: BuoyGear, edgetech_buoy: Optional[Buoy] = None + self, + er_gear: BuoyGear, + edgetech_buoy: Optional[Buoy] = None, + is_redeployment: bool = False, ) -> Dict[str, Any]: """ Create a haul payload from an existing ER gear. @@ -242,6 +245,11 @@ def _create_haul_payload( Args: er_gear: The existing gear from ER edgetech_buoy: Optional EdgeTech buoy data with potential recovery location + is_redeployment: Whether this haul is part of a re-deployment sequence. + When True and no dateRecovered is found, keeps datetime.now() as + recorded_at instead of falling back to lastUpdated, which could + collide with the subsequent deploy payload's recorded_at after + millisecond truncation. Returns: Dict in the format expected by /api/v1/gear/ POST endpoint @@ -274,7 +282,11 @@ def _create_haul_payload( f"Using dateRecovered from changeRecords for haul of " f"{edgetech_buoy.serialNumber}: {recovered_at_from_changes}" ) - elif edgetech_buoy.currentState.lastUpdated: + elif not is_redeployment and edgetech_buoy.currentState.lastUpdated: + # For non-redeployment hauls, lastUpdated reflects the haul. + # For re-deployments, lastUpdated reflects the NEW deployment + # and would collide with the deploy payload's recorded_at + # after millisecond truncation — keep datetime.now() instead. haul_recorded_at = edgetech_buoy.currentState.lastUpdated if edgetech_buoy: @@ -847,7 +859,9 @@ async def process(self) -> List[Dict[str, Any]]: try: payload = self._create_haul_payload( - er_gear=er_gear, edgetech_buoy=edgetech_buoy + er_gear=er_gear, + edgetech_buoy=edgetech_buoy, + is_redeployment=serial_number_user_id in to_deploy, ) gear_payloads.append(payload) haul_gears_processed.add(er_gear.display_id) diff --git a/app/actions/tests/test_edgetech_processor.py b/app/actions/tests/test_edgetech_processor.py index 1c0857b0..04b11360 100644 --- a/app/actions/tests/test_edgetech_processor.py +++ b/app/actions/tests/test_edgetech_processor.py @@ -1994,3 +1994,100 @@ async def test_redeployment_identify_haul_and_deploy(self): assert buoy_key in to_haul assert buoy_key in to_deploy assert len(to_update) == 0 + + def test_redeployment_haul_without_date_recovered_avoids_recorded_at_collision( + self, + ): + """When EdgeTech skips the haul stage (no dateRecovered anywhere) and + dateDeployed/lastUpdated fall in the same second, the haul payload must + NOT reuse lastUpdated as recorded_at — that would collide with the + deploy payload's recorded_at after millisecond truncation.""" + user_id = "684b1ec1c1df05abfa78b756" + serial_number = "88CE99D98B" + hashed_user_id = get_hashed_user_id(user_id) + + # Real-world data: EdgeTech skipped haul, went straight to new deployment. + # dateDeployed and lastUpdated are in the same second. + buoy_data = { + "serialNumber": serial_number, + "userId": user_id, + "currentState": { + "etag": "1774708500700", + "isDeleted": False, + "serialNumber": serial_number, + "releaseCommand": "C8AB8CDC8B", + "statusCommand": serial_number, + "idCommand": "CCCCCCCCCC", + "latDeg": 42.4224917, + "lonDeg": -70.6649972, + "endLatDeg": 42.4268093, + "endLonDeg": -70.6696386, + "modelNumber": "5112", + "isDeployed": True, + "dateDeployed": "2026-03-28T14:35:00.167Z", + "lastUpdated": "2026-03-28T14:35:00.700Z", + }, + "changeRecords": [ + { + "type": "MODIFY", + "timestamp": "2026-03-28T14:35:00.000Z", + "changes": [ + { + "key": "dateDeployed", + "oldValue": "2026-03-05T15:15:45.573Z", + "newValue": "2026-03-28T14:35:00.167Z", + }, + { + "key": "latDeg", + "oldValue": 42.4275164, + "newValue": 42.4224917, + }, + { + "key": "lonDeg", + "oldValue": -70.6701861, + "newValue": -70.6649972, + }, + ], + } + ], + } + + buoy = Buoy.parse_obj(buoy_data) + processor = EdgeTechProcessor(data=[buoy_data], er_token="token", er_url="url") + + device_id_a = f"{serial_number}_{hashed_user_id}_A" + mock_device = BuoyDevice( + device_id="existing-uuid", + mfr_device_id=device_id_a, + label="Device A", + location=DeviceLocation(latitude=42.4275164, longitude=-70.6701861), + last_updated=datetime(2026, 3, 5, 15, 15, 45, tzinfo=timezone.utc), + last_deployed=datetime(2026, 3, 5, 15, 15, 45, tzinfo=timezone.utc), + ) + mock_gear = BuoyGear( + id=uuid4(), + display_id="GEAR-OLD", + status="deployed", + last_updated=datetime(2026, 3, 5, 15, 15, 45, tzinfo=timezone.utc), + devices=[mock_device], + type="single", + manufacturer="edgetech", + ) + + # Haul payload for re-deployment (is_redeployment=True) + haul_payload = processor._create_haul_payload( + er_gear=mock_gear, edgetech_buoy=buoy, is_redeployment=True + ) + + # The haul recorded_at should NOT be lastUpdated (2026-03-28T14:35:00Z after + # truncation) because dateDeployed also truncates to the same value, which + # would cause the deploy payload to be rejected by ER's unique constraint. + haul_recorded_at = haul_payload["devices"][0]["recorded_at"] + deploy_recorded_at = processor._remove_milliseconds( + buoy.currentState.dateDeployed + ).isoformat() + + assert haul_recorded_at != deploy_recorded_at, ( + f"Haul recorded_at ({haul_recorded_at}) must differ from deploy " + f"recorded_at ({deploy_recorded_at}) to avoid ER unique constraint collision" + ) From a0d5fce35fe7b02d26355cfcb0e953f161519ac5 Mon Sep 17 00:00:00 2001 From: chrisj-er <“chrisj@earthranger.com”> Date: Sat, 28 Mar 2026 10:32:12 -0700 Subject: [PATCH 2/6] =?UTF-8?q?dateDeployed=20-=201s=20instead=20of=20date?= =?UTF-8?q?time.now()=20(processor.py:285-293)=20=E2=80=94=20deterministic?= =?UTF-8?q?,=20guaranteed=20no=20collision=20regardless=20of=20when=20the?= =?UTF-8?q?=20processor=20runs=20=20=202.=20self.=5Futcnow()=20instead=20o?= =?UTF-8?q?f=20datetime.now(timezone.utc)=20(processor.py:268)=20=E2=80=94?= =?UTF-8?q?=20uses=20the=20existing=20testable=20helper=20for=20the=20defa?= =?UTF-8?q?ult=20fallback=20=20=203.=20Deterministic=20test=20(test=20file?= =?UTF-8?q?)=20=E2=80=94=20asserts=20the=20exact=20expected=20value=20(dat?= =?UTF-8?q?eDeployed=20-=201s)=20instead=20of=20just=20!=3D=20=20=204.=20R?= =?UTF-8?q?EADME=20x2=20=E2=80=94=20updated=20both=20references=20to=20des?= =?UTF-8?q?cribe=20dateDeployed=20-=201s=20instead=20of=20datetime.now(UTC?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 4 ++-- app/actions/edgetech/processor.py | 17 +++++++++++------ app/actions/tests/test_edgetech_processor.py | 12 +++++++++--- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 6da0567c..ecf52020 100644 --- a/README.md +++ b/README.md @@ -367,7 +367,7 @@ After filtering, we compare EdgeTech data with our existing Earth Ranger records - **Why**: When the same serial(s) are deployed again (e.g. same two-unit pair with a new `dateDeployed`), we close the previous deployment in ER/Buoy and create a new one instead of updating the old gear in place - **Haul timestamp for re-deployments**: The haul payload’s `recorded_at` must not collide with the deploy payload’s `recorded_at` (which uses `dateDeployed`), since both share the same `device_id` and ER enforces a `(device_id, recorded_at)` unique constraint. Timestamps are truncated to seconds (`_remove_milliseconds`), so values within the same second will collide. - **With `dateRecovered` in `changeRecords`**: When a buoy is hauled and redeployed within seconds, `currentState.dateRecovered` is cleared by the redeploy. The haul `recorded_at` is sourced from the most recent `dateRecovered` in `changeRecords`. Recovery location (`recoveredLatDeg`/`recoveredLonDeg`) is also recovered from `changeRecords`. - - **Without `dateRecovered` (skipped haul)**: EdgeTech may skip the haul stage entirely — the gear goes straight from one deployment to another with no `dateRecovered` anywhere. In this case, `lastUpdated` and `dateDeployed` are typically within the same second (both reflect the new deployment), so the haul uses `datetime.now(UTC)` as `recorded_at` instead of falling back to `lastUpdated`. + - **Without `dateRecovered` (skipped haul)**: EdgeTech may skip the haul stage entirely — the gear goes straight from one deployment to another with no `dateRecovered` anywhere. In this case, `lastUpdated` and `dateDeployed` are typically within the same second (both reflect the new deployment), so the haul uses `dateDeployed - 1 second` as a deterministic `recorded_at` that is guaranteed not to collide with the deploy payload. **2a. DEPLOY (Recovery from missed deployment)** - Buoy exists in **both** EdgeTech and Earth Ranger, but ER gear `status` is **not** `"deployed"` (e.g. `"hauled"`) @@ -1136,7 +1136,7 @@ This integration provides robust synchronization between EdgeTech's Trap Tracker ✅ **Efficient database dump** mechanism for bulk data retrieval ✅ **Intelligent filtering** to process active and explicitly hauled buoys ✅ **Explicit status-based haul detection** (not inferred from absence) -✅ **Re-deployment handling**: when EdgeTech `dateDeployed` is meaningfully later than ER’s deployment, we close the previous gear and create a new one (hauls sent before new deployments). Haul timestamps and recovery locations are sourced from `changeRecords` when a rapid haul+redeploy clears `currentState`. When EdgeTech skips the haul stage entirely (no `dateRecovered` anywhere), the synthetic haul uses `datetime.now(UTC)` to avoid `recorded_at` collisions with the deploy payload. +✅ **Re-deployment handling**: when EdgeTech `dateDeployed` is meaningfully later than ER’s deployment, we close the previous gear and create a new one (hauls sent before new deployments). Haul timestamps and recovery locations are sourced from `changeRecords` when a rapid haul+redeploy clears `currentState`. When EdgeTech skips the haul stage entirely (no `dateRecovered` anywhere), the synthetic haul uses `dateDeployed - 1s` as a deterministic `recorded_at` to avoid collisions with the deploy payload. ✅ **Recovery from missed deployments**: if a re-deployment haul succeeded but the deploy failed, subsequent runs detect the hauled ER gear vs deployed EdgeTech state and create the missing deployment ✅ **Set ID resolution** to correctly update existing vs create new gear sets ✅ **Support for complex systems** including two-unit lines diff --git a/app/actions/edgetech/processor.py b/app/actions/edgetech/processor.py index eae6a395..72089933 100644 --- a/app/actions/edgetech/processor.py +++ b/app/actions/edgetech/processor.py @@ -265,7 +265,7 @@ def _create_haul_payload( # Use dateRecovered if available; for re-deployments (haul then immediate # redeploy) the currentState dateRecovered is cleared, so check changeRecords # for the most recent dateRecovered value. Fall back to lastUpdated. - haul_recorded_at = datetime.now(timezone.utc) + haul_recorded_at = self._utcnow() if edgetech_buoy: if edgetech_buoy.currentState.dateRecovered: haul_recorded_at = edgetech_buoy.currentState.dateRecovered @@ -282,11 +282,16 @@ def _create_haul_payload( f"Using dateRecovered from changeRecords for haul of " f"{edgetech_buoy.serialNumber}: {recovered_at_from_changes}" ) - elif not is_redeployment and edgetech_buoy.currentState.lastUpdated: - # For non-redeployment hauls, lastUpdated reflects the haul. - # For re-deployments, lastUpdated reflects the NEW deployment - # and would collide with the deploy payload's recorded_at - # after millisecond truncation — keep datetime.now() instead. + elif is_redeployment and edgetech_buoy.currentState.dateDeployed: + # For re-deployments without dateRecovered, lastUpdated and + # dateDeployed are typically in the same second (both reflect + # the new deployment). Use dateDeployed - 1s as a deterministic + # haul timestamp that is guaranteed not to collide with the + # deploy payload's recorded_at after millisecond truncation. + haul_recorded_at = ( + edgetech_buoy.currentState.dateDeployed - timedelta(seconds=1) + ) + elif edgetech_buoy.currentState.lastUpdated: haul_recorded_at = edgetech_buoy.currentState.lastUpdated if edgetech_buoy: diff --git a/app/actions/tests/test_edgetech_processor.py b/app/actions/tests/test_edgetech_processor.py index 04b11360..a0a6d8c3 100644 --- a/app/actions/tests/test_edgetech_processor.py +++ b/app/actions/tests/test_edgetech_processor.py @@ -2079,14 +2079,20 @@ def test_redeployment_haul_without_date_recovered_avoids_recorded_at_collision( er_gear=mock_gear, edgetech_buoy=buoy, is_redeployment=True ) - # The haul recorded_at should NOT be lastUpdated (2026-03-28T14:35:00Z after - # truncation) because dateDeployed also truncates to the same value, which - # would cause the deploy payload to be rejected by ER's unique constraint. + # The haul recorded_at should be dateDeployed - 1s (deterministic, no + # collision with the deploy payload's recorded_at after truncation). haul_recorded_at = haul_payload["devices"][0]["recorded_at"] deploy_recorded_at = processor._remove_milliseconds( buoy.currentState.dateDeployed ).isoformat() + expected_haul_recorded_at = processor._remove_milliseconds( + buoy.currentState.dateDeployed - timedelta(seconds=1) + ).isoformat() + assert haul_recorded_at == expected_haul_recorded_at, ( + f"Haul recorded_at ({haul_recorded_at}) must be dateDeployed - 1s " + f"({expected_haul_recorded_at})" + ) assert haul_recorded_at != deploy_recorded_at, ( f"Haul recorded_at ({haul_recorded_at}) must differ from deploy " f"recorded_at ({deploy_recorded_at}) to avoid ER unique constraint collision" From 33d3c81e5b84d3d6abf70468cda89b0e7ae7e072 Mon Sep 17 00:00:00 2001 From: chrisj-er <“chrisj@earthranger.com”> Date: Sat, 28 Mar 2026 10:41:41 -0700 Subject: [PATCH 3/6] last_updated in haul device now uses self._utcnow() instead of datetime.now(timezone.utc) --- app/actions/edgetech/processor.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/app/actions/edgetech/processor.py b/app/actions/edgetech/processor.py index 72089933..46fd9320 100644 --- a/app/actions/edgetech/processor.py +++ b/app/actions/edgetech/processor.py @@ -246,10 +246,10 @@ def _create_haul_payload( er_gear: The existing gear from ER edgetech_buoy: Optional EdgeTech buoy data with potential recovery location is_redeployment: Whether this haul is part of a re-deployment sequence. - When True and no dateRecovered is found, keeps datetime.now() as - recorded_at instead of falling back to lastUpdated, which could - collide with the subsequent deploy payload's recorded_at after - millisecond truncation. + When True and no dateRecovered is found, uses dateDeployed - 1s + as a deterministic recorded_at instead of falling back to + lastUpdated, which could collide with the subsequent deploy + payload's recorded_at after millisecond truncation. Returns: Dict in the format expected by /api/v1/gear/ POST endpoint @@ -262,9 +262,8 @@ def _create_haul_payload( recovery_lon = None # Determine the recorded_at timestamp for the haul event - # Use dateRecovered if available; for re-deployments (haul then immediate - # redeploy) the currentState dateRecovered is cleared, so check changeRecords - # for the most recent dateRecovered value. Fall back to lastUpdated. + # Priority: dateRecovered from currentState → dateRecovered from changeRecords + # → dateDeployed - 1s (re-deployments without dateRecovered) → lastUpdated. haul_recorded_at = self._utcnow() if edgetech_buoy: if edgetech_buoy.currentState.dateRecovered: @@ -340,9 +339,7 @@ def _create_haul_payload( if device.last_deployed else device.last_updated.isoformat() ), - "last_updated": self._remove_milliseconds( - datetime.now(timezone.utc) - ).isoformat(), + "last_updated": self._remove_milliseconds(self._utcnow()).isoformat(), "recorded_at": self._remove_milliseconds(haul_recorded_at).isoformat(), "device_status": "hauled", "location": { From 14b9ccd91f0e0b8dbd9eec58eca408fcdcb01d21 Mon Sep 17 00:00:00 2001 From: chrisj-er <“chrisj@earthranger.com”> Date: Tue, 14 Apr 2026 08:24:54 -0700 Subject: [PATCH 4/6] fix: when the change data still contains a previous deploy and haul, ignore that data if the change data also contains two more deploys. For the later two deploys, we want to treat the first deploy as hauled, then only deploy the final --- app/actions/edgetech/processor.py | 55 ++++++- app/actions/tests/test_edgetech_processor.py | 154 +++++++++++++++++++ 2 files changed, 203 insertions(+), 6 deletions(-) diff --git a/app/actions/edgetech/processor.py b/app/actions/edgetech/processor.py index 46fd9320..44039ffc 100644 --- a/app/actions/edgetech/processor.py +++ b/app/actions/edgetech/processor.py @@ -264,7 +264,22 @@ def _create_haul_payload( # Determine the recorded_at timestamp for the haul event # Priority: dateRecovered from currentState → dateRecovered from changeRecords # → dateDeployed - 1s (re-deployments without dateRecovered) → lastUpdated. + # + # For re-deployments, only consider changeRecord dateRecovered values that + # occurred *after* the current ER gear's deployment date. Older values + # belong to a previous deploy/haul lifecycle and would produce an + # incorrect (or duplicate) recorded_at. haul_recorded_at = self._utcnow() + change_record_min_date: Optional[datetime] = None + if is_redeployment: + er_deployment_dates = [ + d.last_deployed + for d in er_gear.devices + if isinstance(getattr(d, "last_deployed", None), datetime) + ] + if er_deployment_dates: + change_record_min_date = min(er_deployment_dates) + if edgetech_buoy: if edgetech_buoy.currentState.dateRecovered: haul_recorded_at = edgetech_buoy.currentState.dateRecovered @@ -273,7 +288,9 @@ def _create_haul_payload( # was overwritten by the redeploy, but the haul timestamp is preserved # in the change history) recovered_at_from_changes = ( - self._get_date_recovered_from_change_records(edgetech_buoy) + self._get_date_recovered_from_change_records( + edgetech_buoy, min_date=change_record_min_date + ) ) if recovered_at_from_changes: haul_recorded_at = recovered_at_from_changes @@ -305,7 +322,7 @@ def _create_haul_payload( # Re-deployment: recovery location may be in changeRecords # (currentState was overwritten by the redeploy) rec_lat, rec_lon = self._get_recovery_location_from_change_records( - edgetech_buoy + edgetech_buoy, min_date=change_record_min_date ) if rec_lat is not None and rec_lon is not None: recovery_location_available = True @@ -380,7 +397,9 @@ def _is_hauled_or_recovered(self, record: Buoy) -> bool: ) @staticmethod - def _get_date_recovered_from_change_records(buoy: Buoy) -> Optional[datetime]: + def _get_date_recovered_from_change_records( + buoy: Buoy, min_date: Optional[datetime] = None + ) -> Optional[datetime]: """ Find the most recent dateRecovered value from a buoy's changeRecords. @@ -388,6 +407,12 @@ def _get_date_recovered_from_change_records(buoy: Buoy) -> Optional[datetime]: immediately redeployed — the currentState no longer has dateRecovered (cleared by the redeploy), but the changeRecords preserve the haul timestamp. + Args: + buoy: The buoy record to search. + min_date: If provided, only consider dateRecovered values that are + after this date. Used during re-deployments to exclude stale + recovery dates from a previous deploy/haul lifecycle. + Returns: The most recent dateRecovered datetime, or None if not found. """ @@ -399,6 +424,8 @@ def _get_date_recovered_from_change_records(buoy: Buoy) -> Optional[datetime]: recovered_dt = datetime.fromisoformat( str(change.newValue).replace("Z", "+00:00") ) + if min_date and recovered_dt < min_date: + continue if most_recent is None or recovered_dt > most_recent: most_recent = recovered_dt except (ValueError, TypeError): @@ -408,6 +435,7 @@ def _get_date_recovered_from_change_records(buoy: Buoy) -> Optional[datetime]: @staticmethod def _get_recovery_location_from_change_records( buoy: Buoy, + min_date: Optional[datetime] = None, ) -> Tuple[Optional[float], Optional[float]]: """ Find the most recent recovery location from a buoy's changeRecords. @@ -415,6 +443,12 @@ def _get_recovery_location_from_change_records( Needed for re-deployment scenarios where recoveredLatDeg/recoveredLonDeg in currentState were cleared by the redeploy. + Args: + buoy: The buoy record to search. + min_date: If provided, only consider recovery records whose + dateRecovered value is after this date. Used during + re-deployments to exclude stale data from a prior lifecycle. + Returns: Tuple of (latitude, longitude) or (None, None) if not found. """ @@ -426,15 +460,24 @@ def _get_recovery_location_from_change_records( for record in buoy.changeRecords: lat = None lon = None - has_recovery = False + recovered_dt = None for change in record.changes: if change.key == "dateRecovered" and change.newValue is not None: - has_recovery = True + try: + recovered_dt = datetime.fromisoformat( + str(change.newValue).replace("Z", "+00:00") + ) + except (ValueError, TypeError): + pass elif change.key == "recoveredLatDeg" and change.newValue is not None: lat = change.newValue elif change.key == "recoveredLonDeg" and change.newValue is not None: lon = change.newValue - if has_recovery and lat is not None and lon is not None: + if recovered_dt is None: + continue + if min_date and recovered_dt < min_date: + continue + if lat is not None and lon is not None: if best_timestamp is None or record.timestamp > best_timestamp: best_timestamp = record.timestamp best_lat = lat diff --git a/app/actions/tests/test_edgetech_processor.py b/app/actions/tests/test_edgetech_processor.py index a0a6d8c3..f7ffc014 100644 --- a/app/actions/tests/test_edgetech_processor.py +++ b/app/actions/tests/test_edgetech_processor.py @@ -2097,3 +2097,157 @@ def test_redeployment_haul_without_date_recovered_avoids_recorded_at_collision( f"Haul recorded_at ({haul_recorded_at}) must differ from deploy " f"recorded_at ({deploy_recorded_at}) to avoid ER unique constraint collision" ) + + def test_redeployment_ignores_stale_date_recovered_from_prior_lifecycle(self): + """When changeRecords contain a dateRecovered from a *previous* deploy/haul + lifecycle (before the current ER gear was even deployed), the haul payload + must NOT use that stale value. It should fall through to dateDeployed - 1s. + + Real-world scenario: + 1. Deploy at March 16 + 2. Haul at April 10 (dateRecovered = April 10 19:32:59) + 3. Deploy at April 13 19:05 (clears dateRecovered) + 4. Re-deploy at April 13 19:09 (dateDeployed 19:05 → 19:09) + + When processing step 4, the haul for the 19:05 gear must NOT use the + April 10 dateRecovered — that belongs to the March 16 gear's lifecycle. + """ + user_id = "6228fab6b9923b00705ba333" + serial_number = "1234567890" + hashed_user_id = get_hashed_user_id(user_id) + + buoy_data = { + "serialNumber": serial_number, + "userId": user_id, + "currentState": { + "etag": '"1776107382734"', + "isDeleted": False, + "serialNumber": serial_number, + "releaseCommand": "1234567899", + "statusCommand": serial_number, + "idCommand": "CCCCCCCCCC", + "latDeg": 41.749466138317125, + "lonDeg": -70.74163437237308, + "endLatDeg": 41.74948039627152, + "endLonDeg": -70.741625073152, + "modelNumber": "1234", + "isDeployed": True, + "dateDeployed": "2026-04-13T19:09:28.998Z", + "isTwoUnitLine": False, + "lastUpdated": "2026-04-13T19:09:42.734Z", + }, + "changeRecords": [ + { + "type": "MODIFY", + "timestamp": "2026-04-13T19:09:42.000Z", + "changes": [ + { + "key": "dateDeployed", + "oldValue": "2026-04-13T19:05:54.435Z", + "newValue": "2026-04-13T19:09:28.998Z", + }, + { + "key": "lastUpdated", + "oldValue": "2026-04-13T19:05:54.950Z", + "newValue": "2026-04-13T19:09:42.734Z", + }, + ], + }, + { + "type": "MODIFY", + "timestamp": "2026-04-13T19:05:54.000Z", + "changes": [ + { + "key": "dateDeployed", + "oldValue": None, + "newValue": "2026-04-13T19:05:54.435Z", + }, + { + "key": "dateRecovered", + "oldValue": "2026-04-10T19:32:59.960Z", + "newValue": None, + }, + {"key": "isDeployed", "oldValue": False, "newValue": True}, + ], + }, + { + "type": "MODIFY", + "timestamp": "2026-04-10T19:33:00.000Z", + "changes": [ + { + "key": "dateDeployed", + "oldValue": "2026-03-16T20:36:10.316Z", + "newValue": None, + }, + { + "key": "dateRecovered", + "oldValue": None, + "newValue": "2026-04-10T19:32:59.960Z", + }, + {"key": "isDeployed", "oldValue": True, "newValue": False}, + { + "key": "recoveredLatDeg", + "oldValue": None, + "newValue": 41.70440105394446, + }, + { + "key": "recoveredLonDeg", + "oldValue": None, + "newValue": -70.58701236527317, + }, + ], + }, + ], + } + + buoy = Buoy.parse_obj(buoy_data) + processor = EdgeTechProcessor(data=[buoy_data], er_token="token", er_url="url") + + # ER gear from the 19:05 deployment + device_id_a = f"{serial_number}_{hashed_user_id}_A" + mock_device = BuoyDevice( + device_id="existing-uuid", + mfr_device_id=device_id_a, + label="Device A", + location=DeviceLocation( + latitude=41.749452004389454, longitude=-70.7414179924815 + ), + last_updated=datetime(2026, 4, 13, 19, 5, 54, tzinfo=timezone.utc), + last_deployed=datetime(2026, 4, 13, 19, 5, 54, tzinfo=timezone.utc), + ) + mock_gear = BuoyGear( + id=uuid4(), + display_id="GEAR-1905", + status="deployed", + last_updated=datetime(2026, 4, 13, 19, 5, 54, tzinfo=timezone.utc), + devices=[mock_device], + type="single", + manufacturer="edgetech", + ) + + haul_payload = processor._create_haul_payload( + er_gear=mock_gear, edgetech_buoy=buoy, is_redeployment=True + ) + + haul_recorded_at = haul_payload["devices"][0]["recorded_at"] + + # Must be dateDeployed - 1s (2026-04-13T19:09:27), NOT the stale + # April 10 dateRecovered from the prior lifecycle + expected = processor._remove_milliseconds( + buoy.currentState.dateDeployed - timedelta(seconds=1) + ).isoformat() + stale_april_10 = "2026-04-10T19:32:59+00:00" + + assert haul_recorded_at != stale_april_10, ( + f"Haul recorded_at ({haul_recorded_at}) must NOT use the stale " + f"April 10 dateRecovered from a prior lifecycle" + ) + assert haul_recorded_at == expected, ( + f"Haul recorded_at ({haul_recorded_at}) should be dateDeployed - 1s " + f"({expected})" + ) + + # Recovery location should NOT come from the stale April 10 changeRecord; + # it should fall back to the ER device's deployed location + assert haul_payload["devices"][0]["location"]["latitude"] == 41.749452004389454 + assert haul_payload["devices"][0]["location"]["longitude"] == -70.7414179924815 From bee70f363423fca057b30ae7275ab5446ecf64a3 Mon Sep 17 00:00:00 2001 From: chrisj-er <“chrisj@earthranger.com”> Date: Thu, 16 Apr 2026 10:42:30 -0700 Subject: [PATCH 5/6] fix: another issue with one gearset record in Edgetech recording several missed hauls resulted in a duplicate gearset in ER. --- app/actions/edgetech/processor.py | 24 +++- app/actions/tests/test_edgetech_processor.py | 122 +++++++++++++++++++ 2 files changed, 143 insertions(+), 3 deletions(-) diff --git a/app/actions/edgetech/processor.py b/app/actions/edgetech/processor.py index 44039ffc..945c6010 100644 --- a/app/actions/edgetech/processor.py +++ b/app/actions/edgetech/processor.py @@ -858,9 +858,27 @@ async def process(self) -> List[Dict[str, Any]]: logger.info("Fetching all gears from EarthRanger...") er_gears = await self._er_client.get_er_gears(params={"page_size": 10000}) - er_gears_devices_id_to_gear = { - device.mfr_device_id: gear for gear in er_gears for device in gear.devices - } + # Multiple ER gears can share a mfr_device_id (e.g. a hauled gear from a + # prior lifecycle plus a currently-deployed gear). Prefer the deployed + # gear; among same-status gears, prefer the most recently updated. + # Without this, a leftover hauled gear could win the lookup and trick + # _identify_buoys into creating a duplicate deployment. + er_gears_devices_id_to_gear: Dict[str, BuoyGear] = {} + for gear in er_gears: + for device in gear.devices: + existing = er_gears_devices_id_to_gear.get(device.mfr_device_id) + if existing is None: + er_gears_devices_id_to_gear[device.mfr_device_id] = gear + continue + existing_deployed = existing.status == "deployed" + gear_deployed = gear.status == "deployed" + if existing_deployed and not gear_deployed: + continue + if gear_deployed and not existing_deployed: + er_gears_devices_id_to_gear[device.mfr_device_id] = gear + continue + if gear.last_updated > existing.last_updated: + er_gears_devices_id_to_gear[device.mfr_device_id] = gear to_deploy, to_haul, to_update = await self._identify_buoys( er_gears_devices_id_to_gear, diff --git a/app/actions/tests/test_edgetech_processor.py b/app/actions/tests/test_edgetech_processor.py index f7ffc014..a8389876 100644 --- a/app/actions/tests/test_edgetech_processor.py +++ b/app/actions/tests/test_edgetech_processor.py @@ -2251,3 +2251,125 @@ def test_redeployment_ignores_stale_date_recovered_from_prior_lifecycle(self): # it should fall back to the ER device's deployed location assert haul_payload["devices"][0]["location"]["latitude"] == 41.749452004389454 assert haul_payload["devices"][0]["location"]["longitude"] == -70.7414179924815 + + @pytest.mark.asyncio + async def test_process_prefers_deployed_gear_over_hauled_for_same_device(self): + """When ER returns multiple gears sharing a mfr_device_id (e.g. a hauled + gear from a prior lifecycle plus a currently-deployed gear from the + latest re-deploy), the lookup must pick the deployed one. Otherwise the + hauled gear can win, _identify_buoys falls into the + ``status != 'deployed' and isDeployed`` branch, and a duplicate + gearset is created. + """ + user_id = "6846e8f6e0488a09f1d5b39a" + serial_number = "88CE99D358" + hashed_user_id = get_hashed_user_id(user_id) + + # EdgeTech currentState reflects the latest re-deploy; same dateDeployed + # as the deployed ER gear (no re-deployment, no update needed). + buoy_data = { + "serialNumber": serial_number, + "userId": user_id, + "currentState": { + "etag": '"1776336184788"', + "isDeleted": False, + "serialNumber": serial_number, + "releaseCommand": "C8AB8C7658", + "statusCommand": serial_number, + "idCommand": "CCCCCCCCCC", + "latDeg": 42.4476378, + "lonDeg": -70.608329, + "endLatDeg": 42.455983, + "endLonDeg": -70.616863, + "modelNumber": "5112", + "isDeployed": True, + "dateDeployed": "2026-04-16T10:43:04.182Z", + "isTwoUnitLine": False, + "lastUpdated": "2026-04-16T10:43:04.788Z", + }, + "changeRecords": [], + } + + processor = EdgeTechProcessor(data=[buoy_data], er_token="token", er_url="url") + + device_id_a = f"{serial_number}_{hashed_user_id}_A" + device_id_b = f"{serial_number}_{hashed_user_id}_B" + + # Hauled gear from a prior lifecycle (same device serials). + hauled_device_a = BuoyDevice( + device_id="old-uuid-a", + mfr_device_id=device_id_a, + label="Device A", + location=DeviceLocation(latitude=42.4487014, longitude=-70.6094034), + last_updated=datetime(2026, 4, 16, 10, 34, 35, tzinfo=timezone.utc), + last_deployed=datetime(2026, 4, 9, 14, 17, 56, tzinfo=timezone.utc), + ) + hauled_device_b = BuoyDevice( + device_id="old-uuid-b", + mfr_device_id=device_id_b, + label="Device B", + location=DeviceLocation(latitude=42.4557331, longitude=-70.6168107), + last_updated=datetime(2026, 4, 16, 10, 34, 35, tzinfo=timezone.utc), + last_deployed=datetime(2026, 4, 9, 14, 17, 56, tzinfo=timezone.utc), + ) + hauled_gear = BuoyGear( + id=uuid4(), + display_id="GEAR-APRIL-9", + status="hauled", + last_updated=datetime(2026, 4, 16, 10, 34, 35, tzinfo=timezone.utc), + devices=[hauled_device_a, hauled_device_b], + type="trawl", + manufacturer="edgetech", + ) + + # Currently-deployed gear from the April 16 re-deploy. + deployed_device_a = BuoyDevice( + device_id="new-uuid-a", + mfr_device_id=device_id_a, + label="Device A", + location=DeviceLocation(latitude=42.4476378, longitude=-70.608329), + last_updated=datetime(2026, 4, 16, 10, 43, 4, tzinfo=timezone.utc), + last_deployed=datetime(2026, 4, 16, 10, 43, 4, tzinfo=timezone.utc), + ) + deployed_device_b = BuoyDevice( + device_id="new-uuid-b", + mfr_device_id=device_id_b, + label="Device B", + location=DeviceLocation(latitude=42.455983, longitude=-70.616863), + last_updated=datetime(2026, 4, 16, 10, 43, 4, tzinfo=timezone.utc), + last_deployed=datetime(2026, 4, 16, 10, 43, 4, tzinfo=timezone.utc), + ) + deployed_gear = BuoyGear( + id=uuid4(), + display_id="GEAR-APRIL-16", + status="deployed", + last_updated=datetime(2026, 4, 16, 10, 43, 4, tzinfo=timezone.utc), + devices=[deployed_device_a, deployed_device_b], + type="trawl", + manufacturer="edgetech", + ) + + # Deployed gear listed first, hauled gear last — without the dedup + # logic, the hauled gear (last write wins in a dict comprehension) + # would clobber the deployed entry and trigger a duplicate deployment. + mock_er_client = Mock() + mock_er_client.get_er_gears = AsyncMock( + return_value=[deployed_gear, hauled_gear] + ) + mock_er_client.get_sources = AsyncMock(return_value=[]) + processor._er_client = mock_er_client + + gear_payloads = await processor.process() + + # No deployment payload should be generated; the deployed April 16 gear + # already matches the EdgeTech currentState. + deploy_payloads = [ + p + for p in gear_payloads + if any(d.get("device_status") == "deployed" for d in p.get("devices", [])) + and "initial_deployment_date" in p + ] + assert deploy_payloads == [], ( + f"Expected no new deployment, got {len(deploy_payloads)}: " + f"{deploy_payloads}" + ) From 4fcef1c2a5c97845f26dc571441b4d16ececad0b Mon Sep 17 00:00:00 2001 From: chrisj-er <“chrisj@earthranger.com”> Date: Thu, 16 Apr 2026 12:26:16 -0700 Subject: [PATCH 6/6] fix: helpers now coerce to timezone aware. --- app/actions/edgetech/processor.py | 10 +++ app/actions/tests/test_edgetech_processor.py | 64 ++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/app/actions/edgetech/processor.py b/app/actions/edgetech/processor.py index 945c6010..095ae3bd 100644 --- a/app/actions/edgetech/processor.py +++ b/app/actions/edgetech/processor.py @@ -416,6 +416,11 @@ def _get_date_recovered_from_change_records( Returns: The most recent dateRecovered datetime, or None if not found. """ + # recovered_dt is always tz-aware (we append +00:00), but min_date can + # come from BuoyDevice.last_deployed which pydantic does not coerce to + # tz-aware. Normalize naive → UTC to avoid a TypeError on comparison. + if min_date is not None and min_date.tzinfo is None: + min_date = min_date.replace(tzinfo=timezone.utc) most_recent = None for record in buoy.changeRecords: for change in record.changes: @@ -452,6 +457,11 @@ def _get_recovery_location_from_change_records( Returns: Tuple of (latitude, longitude) or (None, None) if not found. """ + # recovered_dt is always tz-aware (we append +00:00), but min_date can + # come from BuoyDevice.last_deployed which pydantic does not coerce to + # tz-aware. Normalize naive → UTC to avoid a TypeError on comparison. + if min_date is not None and min_date.tzinfo is None: + min_date = min_date.replace(tzinfo=timezone.utc) # Find the changeRecord that set dateRecovered most recently — # its sibling entries will have the recovery coordinates. best_timestamp = None diff --git a/app/actions/tests/test_edgetech_processor.py b/app/actions/tests/test_edgetech_processor.py index a8389876..2835de33 100644 --- a/app/actions/tests/test_edgetech_processor.py +++ b/app/actions/tests/test_edgetech_processor.py @@ -2373,3 +2373,67 @@ async def test_process_prefers_deployed_gear_over_hauled_for_same_device(self): f"Expected no new deployment, got {len(deploy_payloads)}: " f"{deploy_payloads}" ) + + def test_change_record_helpers_accept_naive_min_date(self): + """BuoyDevice.last_deployed is not coerced to tz-aware, so min_date + passed into the change-record helpers can be naive. The helpers must + normalize before comparing against the tz-aware recovered_dt — + otherwise Python raises TypeError on naive vs aware comparison.""" + buoy_data = { + "serialNumber": "88CE99D358", + "userId": "6846e8f6e0488a09f1d5b39a", + "currentState": { + "etag": '"x"', + "isDeleted": False, + "serialNumber": "88CE99D358", + "releaseCommand": "C8AB8C7658", + "statusCommand": "88CE99D358", + "idCommand": "CCCCCCCCCC", + "latDeg": 42.4476378, + "lonDeg": -70.608329, + "modelNumber": "5112", + "isDeployed": True, + "dateDeployed": "2026-04-16T10:43:04.182Z", + "lastUpdated": "2026-04-16T10:43:04.788Z", + }, + "changeRecords": [ + { + "type": "MODIFY", + "timestamp": "2026-04-16T10:34:35.000Z", + "changes": [ + { + "key": "dateRecovered", + "oldValue": None, + "newValue": "2026-04-16T09:54:07.949Z", + }, + { + "key": "recoveredLatDeg", + "oldValue": None, + "newValue": 42.4992445, + }, + { + "key": "recoveredLonDeg", + "oldValue": None, + "newValue": -70.8112983, + }, + ], + }, + ], + } + + buoy = Buoy.parse_obj(buoy_data) + # Naive datetime — simulates an ER device whose last_deployed wasn't + # coerced to tz-aware. + naive_min = datetime(2026, 4, 9, 14, 17, 56) + + # Should not raise TypeError + result = EdgeTechProcessor._get_date_recovered_from_change_records( + buoy, min_date=naive_min + ) + assert result == datetime(2026, 4, 16, 9, 54, 7, 949000, tzinfo=timezone.utc) + + lat, lon = EdgeTechProcessor._get_recovery_location_from_change_records( + buoy, min_date=naive_min + ) + assert lat == 42.4992445 + assert lon == -70.8112983