diff --git a/README.md b/README.md index c4678d53..ecf52020 100644 --- a/README.md +++ b/README.md @@ -365,7 +365,9 @@ After filtering, we compare EdgeTech data with our existing Earth Ranger records - Action: **Haul** the existing gear (close it), then **Deploy** a new gear set with a new `set_id` - **Processing order**: Haul payload is sent first, then the new deployment payload, so the previous gear is closed before the new one is created - **Why**: When the same serial(s) are deployed again (e.g. same two-unit pair with a new `dateDeployed`), we close the previous deployment in ER/Buoy and create a new one instead of updating the old gear in place -- **Haul timestamp for re-deployments**: When a buoy is hauled and redeployed within seconds, the `currentState.dateRecovered` is cleared by the redeploy. In this case, the haul payload’s `recorded_at` is sourced from the most recent `dateRecovered` in `changeRecords` to avoid colliding with the deploy’s `recorded_at` (which uses `dateDeployed`). Recovery location (`recoveredLatDeg`/`recoveredLonDeg`) is also recovered from `changeRecords` in the same way. +- **Haul timestamp for re-deployments**: The haul payload’s `recorded_at` must not collide with the deploy payload’s `recorded_at` (which uses `dateDeployed`), since both share the same `device_id` and ER enforces a `(device_id, recorded_at)` unique constraint. Timestamps are truncated to seconds (`_remove_milliseconds`), so values within the same second will collide. + - **With `dateRecovered` in `changeRecords`**: When a buoy is hauled and redeployed within seconds, `currentState.dateRecovered` is cleared by the redeploy. The haul `recorded_at` is sourced from the most recent `dateRecovered` in `changeRecords`. Recovery location (`recoveredLatDeg`/`recoveredLonDeg`) is also recovered from `changeRecords`. + - **Without `dateRecovered` (skipped haul)**: EdgeTech may skip the haul stage entirely — the gear goes straight from one deployment to another with no `dateRecovered` anywhere. In this case, `lastUpdated` and `dateDeployed` are typically within the same second (both reflect the new deployment), so the haul uses `dateDeployed - 1 second` as a deterministic `recorded_at` that is guaranteed not to collide with the deploy payload. **2a. DEPLOY (Recovery from missed deployment)** - Buoy exists in **both** EdgeTech and Earth Ranger, but ER gear `status` is **not** `"deployed"` (e.g. `"hauled"`) @@ -1134,7 +1136,7 @@ This integration provides robust synchronization between EdgeTech's Trap Tracker ✅ **Efficient database dump** mechanism for bulk data retrieval ✅ **Intelligent filtering** to process active and explicitly hauled buoys ✅ **Explicit status-based haul detection** (not inferred from absence) -✅ **Re-deployment handling**: when EdgeTech `dateDeployed` is meaningfully later than ER’s deployment, we close the previous gear and create a new one (hauls sent before new deployments). Haul timestamps and recovery locations are sourced from `changeRecords` when a rapid haul+redeploy clears `currentState`. +✅ **Re-deployment handling**: when EdgeTech `dateDeployed` is meaningfully later than ER’s deployment, we close the previous gear and create a new one (hauls sent before new deployments). Haul timestamps and recovery locations are sourced from `changeRecords` when a rapid haul+redeploy clears `currentState`. When EdgeTech skips the haul stage entirely (no `dateRecovered` anywhere), the synthetic haul uses `dateDeployed - 1s` as a deterministic `recorded_at` to avoid collisions with the deploy payload. ✅ **Recovery from missed deployments**: if a re-deployment haul succeeded but the deploy failed, subsequent runs detect the hauled ER gear vs deployed EdgeTech state and create the missing deployment ✅ **Set ID resolution** to correctly update existing vs create new gear sets ✅ **Support for complex systems** including two-unit lines diff --git a/app/actions/edgetech/processor.py b/app/actions/edgetech/processor.py index 4b350e72..095ae3bd 100644 --- a/app/actions/edgetech/processor.py +++ b/app/actions/edgetech/processor.py @@ -231,7 +231,10 @@ async def _create_gear_payload( return payload def _create_haul_payload( - self, er_gear: BuoyGear, edgetech_buoy: Optional[Buoy] = None + self, + er_gear: BuoyGear, + edgetech_buoy: Optional[Buoy] = None, + is_redeployment: bool = False, ) -> Dict[str, Any]: """ Create a haul payload from an existing ER gear. @@ -242,6 +245,11 @@ def _create_haul_payload( Args: er_gear: The existing gear from ER edgetech_buoy: Optional EdgeTech buoy data with potential recovery location + is_redeployment: Whether this haul is part of a re-deployment sequence. + When True and no dateRecovered is found, uses dateDeployed - 1s + as a deterministic recorded_at instead of falling back to + lastUpdated, which could collide with the subsequent deploy + payload's recorded_at after millisecond truncation. Returns: Dict in the format expected by /api/v1/gear/ POST endpoint @@ -254,10 +262,24 @@ def _create_haul_payload( recovery_lon = None # Determine the recorded_at timestamp for the haul event - # Use dateRecovered if available; for re-deployments (haul then immediate - # redeploy) the currentState dateRecovered is cleared, so check changeRecords - # for the most recent dateRecovered value. Fall back to lastUpdated. - haul_recorded_at = datetime.now(timezone.utc) + # Priority: dateRecovered from currentState → dateRecovered from changeRecords + # → dateDeployed - 1s (re-deployments without dateRecovered) → lastUpdated. + # + # For re-deployments, only consider changeRecord dateRecovered values that + # occurred *after* the current ER gear's deployment date. Older values + # belong to a previous deploy/haul lifecycle and would produce an + # incorrect (or duplicate) recorded_at. + haul_recorded_at = self._utcnow() + change_record_min_date: Optional[datetime] = None + if is_redeployment: + er_deployment_dates = [ + d.last_deployed + for d in er_gear.devices + if isinstance(getattr(d, "last_deployed", None), datetime) + ] + if er_deployment_dates: + change_record_min_date = min(er_deployment_dates) + if edgetech_buoy: if edgetech_buoy.currentState.dateRecovered: haul_recorded_at = edgetech_buoy.currentState.dateRecovered @@ -266,7 +288,9 @@ def _create_haul_payload( # was overwritten by the redeploy, but the haul timestamp is preserved # in the change history) recovered_at_from_changes = ( - self._get_date_recovered_from_change_records(edgetech_buoy) + self._get_date_recovered_from_change_records( + edgetech_buoy, min_date=change_record_min_date + ) ) if recovered_at_from_changes: haul_recorded_at = recovered_at_from_changes @@ -274,6 +298,15 @@ def _create_haul_payload( f"Using dateRecovered from changeRecords for haul of " f"{edgetech_buoy.serialNumber}: {recovered_at_from_changes}" ) + elif is_redeployment and edgetech_buoy.currentState.dateDeployed: + # For re-deployments without dateRecovered, lastUpdated and + # dateDeployed are typically in the same second (both reflect + # the new deployment). Use dateDeployed - 1s as a deterministic + # haul timestamp that is guaranteed not to collide with the + # deploy payload's recorded_at after millisecond truncation. + haul_recorded_at = ( + edgetech_buoy.currentState.dateDeployed - timedelta(seconds=1) + ) elif edgetech_buoy.currentState.lastUpdated: haul_recorded_at = edgetech_buoy.currentState.lastUpdated @@ -289,7 +322,7 @@ def _create_haul_payload( # Re-deployment: recovery location may be in changeRecords # (currentState was overwritten by the redeploy) rec_lat, rec_lon = self._get_recovery_location_from_change_records( - edgetech_buoy + edgetech_buoy, min_date=change_record_min_date ) if rec_lat is not None and rec_lon is not None: recovery_location_available = True @@ -323,9 +356,7 @@ def _create_haul_payload( if device.last_deployed else device.last_updated.isoformat() ), - "last_updated": self._remove_milliseconds( - datetime.now(timezone.utc) - ).isoformat(), + "last_updated": self._remove_milliseconds(self._utcnow()).isoformat(), "recorded_at": self._remove_milliseconds(haul_recorded_at).isoformat(), "device_status": "hauled", "location": { @@ -366,7 +397,9 @@ def _is_hauled_or_recovered(self, record: Buoy) -> bool: ) @staticmethod - def _get_date_recovered_from_change_records(buoy: Buoy) -> Optional[datetime]: + def _get_date_recovered_from_change_records( + buoy: Buoy, min_date: Optional[datetime] = None + ) -> Optional[datetime]: """ Find the most recent dateRecovered value from a buoy's changeRecords. @@ -374,9 +407,20 @@ def _get_date_recovered_from_change_records(buoy: Buoy) -> Optional[datetime]: immediately redeployed — the currentState no longer has dateRecovered (cleared by the redeploy), but the changeRecords preserve the haul timestamp. + Args: + buoy: The buoy record to search. + min_date: If provided, only consider dateRecovered values that are + after this date. Used during re-deployments to exclude stale + recovery dates from a previous deploy/haul lifecycle. + Returns: The most recent dateRecovered datetime, or None if not found. """ + # recovered_dt is always tz-aware (we append +00:00), but min_date can + # come from BuoyDevice.last_deployed which pydantic does not coerce to + # tz-aware. Normalize naive → UTC to avoid a TypeError on comparison. + if min_date is not None and min_date.tzinfo is None: + min_date = min_date.replace(tzinfo=timezone.utc) most_recent = None for record in buoy.changeRecords: for change in record.changes: @@ -385,6 +429,8 @@ def _get_date_recovered_from_change_records(buoy: Buoy) -> Optional[datetime]: recovered_dt = datetime.fromisoformat( str(change.newValue).replace("Z", "+00:00") ) + if min_date and recovered_dt < min_date: + continue if most_recent is None or recovered_dt > most_recent: most_recent = recovered_dt except (ValueError, TypeError): @@ -394,6 +440,7 @@ def _get_date_recovered_from_change_records(buoy: Buoy) -> Optional[datetime]: @staticmethod def _get_recovery_location_from_change_records( buoy: Buoy, + min_date: Optional[datetime] = None, ) -> Tuple[Optional[float], Optional[float]]: """ Find the most recent recovery location from a buoy's changeRecords. @@ -401,9 +448,20 @@ def _get_recovery_location_from_change_records( Needed for re-deployment scenarios where recoveredLatDeg/recoveredLonDeg in currentState were cleared by the redeploy. + Args: + buoy: The buoy record to search. + min_date: If provided, only consider recovery records whose + dateRecovered value is after this date. Used during + re-deployments to exclude stale data from a prior lifecycle. + Returns: Tuple of (latitude, longitude) or (None, None) if not found. """ + # recovered_dt is always tz-aware (we append +00:00), but min_date can + # come from BuoyDevice.last_deployed which pydantic does not coerce to + # tz-aware. Normalize naive → UTC to avoid a TypeError on comparison. + if min_date is not None and min_date.tzinfo is None: + min_date = min_date.replace(tzinfo=timezone.utc) # Find the changeRecord that set dateRecovered most recently — # its sibling entries will have the recovery coordinates. best_timestamp = None @@ -412,15 +470,24 @@ def _get_recovery_location_from_change_records( for record in buoy.changeRecords: lat = None lon = None - has_recovery = False + recovered_dt = None for change in record.changes: if change.key == "dateRecovered" and change.newValue is not None: - has_recovery = True + try: + recovered_dt = datetime.fromisoformat( + str(change.newValue).replace("Z", "+00:00") + ) + except (ValueError, TypeError): + pass elif change.key == "recoveredLatDeg" and change.newValue is not None: lat = change.newValue elif change.key == "recoveredLonDeg" and change.newValue is not None: lon = change.newValue - if has_recovery and lat is not None and lon is not None: + if recovered_dt is None: + continue + if min_date and recovered_dt < min_date: + continue + if lat is not None and lon is not None: if best_timestamp is None or record.timestamp > best_timestamp: best_timestamp = record.timestamp best_lat = lat @@ -801,9 +868,27 @@ async def process(self) -> List[Dict[str, Any]]: logger.info("Fetching all gears from EarthRanger...") er_gears = await self._er_client.get_er_gears(params={"page_size": 10000}) - er_gears_devices_id_to_gear = { - device.mfr_device_id: gear for gear in er_gears for device in gear.devices - } + # Multiple ER gears can share a mfr_device_id (e.g. a hauled gear from a + # prior lifecycle plus a currently-deployed gear). Prefer the deployed + # gear; among same-status gears, prefer the most recently updated. + # Without this, a leftover hauled gear could win the lookup and trick + # _identify_buoys into creating a duplicate deployment. + er_gears_devices_id_to_gear: Dict[str, BuoyGear] = {} + for gear in er_gears: + for device in gear.devices: + existing = er_gears_devices_id_to_gear.get(device.mfr_device_id) + if existing is None: + er_gears_devices_id_to_gear[device.mfr_device_id] = gear + continue + existing_deployed = existing.status == "deployed" + gear_deployed = gear.status == "deployed" + if existing_deployed and not gear_deployed: + continue + if gear_deployed and not existing_deployed: + er_gears_devices_id_to_gear[device.mfr_device_id] = gear + continue + if gear.last_updated > existing.last_updated: + er_gears_devices_id_to_gear[device.mfr_device_id] = gear to_deploy, to_haul, to_update = await self._identify_buoys( er_gears_devices_id_to_gear, @@ -847,7 +932,9 @@ async def process(self) -> List[Dict[str, Any]]: try: payload = self._create_haul_payload( - er_gear=er_gear, edgetech_buoy=edgetech_buoy + er_gear=er_gear, + edgetech_buoy=edgetech_buoy, + is_redeployment=serial_number_user_id in to_deploy, ) gear_payloads.append(payload) haul_gears_processed.add(er_gear.display_id) diff --git a/app/actions/tests/test_edgetech_processor.py b/app/actions/tests/test_edgetech_processor.py index 1c0857b0..2835de33 100644 --- a/app/actions/tests/test_edgetech_processor.py +++ b/app/actions/tests/test_edgetech_processor.py @@ -1994,3 +1994,446 @@ async def test_redeployment_identify_haul_and_deploy(self): assert buoy_key in to_haul assert buoy_key in to_deploy assert len(to_update) == 0 + + def test_redeployment_haul_without_date_recovered_avoids_recorded_at_collision( + self, + ): + """When EdgeTech skips the haul stage (no dateRecovered anywhere) and + dateDeployed/lastUpdated fall in the same second, the haul payload must + NOT reuse lastUpdated as recorded_at — that would collide with the + deploy payload's recorded_at after millisecond truncation.""" + user_id = "684b1ec1c1df05abfa78b756" + serial_number = "88CE99D98B" + hashed_user_id = get_hashed_user_id(user_id) + + # Real-world data: EdgeTech skipped haul, went straight to new deployment. + # dateDeployed and lastUpdated are in the same second. + buoy_data = { + "serialNumber": serial_number, + "userId": user_id, + "currentState": { + "etag": "1774708500700", + "isDeleted": False, + "serialNumber": serial_number, + "releaseCommand": "C8AB8CDC8B", + "statusCommand": serial_number, + "idCommand": "CCCCCCCCCC", + "latDeg": 42.4224917, + "lonDeg": -70.6649972, + "endLatDeg": 42.4268093, + "endLonDeg": -70.6696386, + "modelNumber": "5112", + "isDeployed": True, + "dateDeployed": "2026-03-28T14:35:00.167Z", + "lastUpdated": "2026-03-28T14:35:00.700Z", + }, + "changeRecords": [ + { + "type": "MODIFY", + "timestamp": "2026-03-28T14:35:00.000Z", + "changes": [ + { + "key": "dateDeployed", + "oldValue": "2026-03-05T15:15:45.573Z", + "newValue": "2026-03-28T14:35:00.167Z", + }, + { + "key": "latDeg", + "oldValue": 42.4275164, + "newValue": 42.4224917, + }, + { + "key": "lonDeg", + "oldValue": -70.6701861, + "newValue": -70.6649972, + }, + ], + } + ], + } + + buoy = Buoy.parse_obj(buoy_data) + processor = EdgeTechProcessor(data=[buoy_data], er_token="token", er_url="url") + + device_id_a = f"{serial_number}_{hashed_user_id}_A" + mock_device = BuoyDevice( + device_id="existing-uuid", + mfr_device_id=device_id_a, + label="Device A", + location=DeviceLocation(latitude=42.4275164, longitude=-70.6701861), + last_updated=datetime(2026, 3, 5, 15, 15, 45, tzinfo=timezone.utc), + last_deployed=datetime(2026, 3, 5, 15, 15, 45, tzinfo=timezone.utc), + ) + mock_gear = BuoyGear( + id=uuid4(), + display_id="GEAR-OLD", + status="deployed", + last_updated=datetime(2026, 3, 5, 15, 15, 45, tzinfo=timezone.utc), + devices=[mock_device], + type="single", + manufacturer="edgetech", + ) + + # Haul payload for re-deployment (is_redeployment=True) + haul_payload = processor._create_haul_payload( + er_gear=mock_gear, edgetech_buoy=buoy, is_redeployment=True + ) + + # The haul recorded_at should be dateDeployed - 1s (deterministic, no + # collision with the deploy payload's recorded_at after truncation). + haul_recorded_at = haul_payload["devices"][0]["recorded_at"] + deploy_recorded_at = processor._remove_milliseconds( + buoy.currentState.dateDeployed + ).isoformat() + expected_haul_recorded_at = processor._remove_milliseconds( + buoy.currentState.dateDeployed - timedelta(seconds=1) + ).isoformat() + + assert haul_recorded_at == expected_haul_recorded_at, ( + f"Haul recorded_at ({haul_recorded_at}) must be dateDeployed - 1s " + f"({expected_haul_recorded_at})" + ) + assert haul_recorded_at != deploy_recorded_at, ( + f"Haul recorded_at ({haul_recorded_at}) must differ from deploy " + f"recorded_at ({deploy_recorded_at}) to avoid ER unique constraint collision" + ) + + def test_redeployment_ignores_stale_date_recovered_from_prior_lifecycle(self): + """When changeRecords contain a dateRecovered from a *previous* deploy/haul + lifecycle (before the current ER gear was even deployed), the haul payload + must NOT use that stale value. It should fall through to dateDeployed - 1s. + + Real-world scenario: + 1. Deploy at March 16 + 2. Haul at April 10 (dateRecovered = April 10 19:32:59) + 3. Deploy at April 13 19:05 (clears dateRecovered) + 4. Re-deploy at April 13 19:09 (dateDeployed 19:05 → 19:09) + + When processing step 4, the haul for the 19:05 gear must NOT use the + April 10 dateRecovered — that belongs to the March 16 gear's lifecycle. + """ + user_id = "6228fab6b9923b00705ba333" + serial_number = "1234567890" + hashed_user_id = get_hashed_user_id(user_id) + + buoy_data = { + "serialNumber": serial_number, + "userId": user_id, + "currentState": { + "etag": '"1776107382734"', + "isDeleted": False, + "serialNumber": serial_number, + "releaseCommand": "1234567899", + "statusCommand": serial_number, + "idCommand": "CCCCCCCCCC", + "latDeg": 41.749466138317125, + "lonDeg": -70.74163437237308, + "endLatDeg": 41.74948039627152, + "endLonDeg": -70.741625073152, + "modelNumber": "1234", + "isDeployed": True, + "dateDeployed": "2026-04-13T19:09:28.998Z", + "isTwoUnitLine": False, + "lastUpdated": "2026-04-13T19:09:42.734Z", + }, + "changeRecords": [ + { + "type": "MODIFY", + "timestamp": "2026-04-13T19:09:42.000Z", + "changes": [ + { + "key": "dateDeployed", + "oldValue": "2026-04-13T19:05:54.435Z", + "newValue": "2026-04-13T19:09:28.998Z", + }, + { + "key": "lastUpdated", + "oldValue": "2026-04-13T19:05:54.950Z", + "newValue": "2026-04-13T19:09:42.734Z", + }, + ], + }, + { + "type": "MODIFY", + "timestamp": "2026-04-13T19:05:54.000Z", + "changes": [ + { + "key": "dateDeployed", + "oldValue": None, + "newValue": "2026-04-13T19:05:54.435Z", + }, + { + "key": "dateRecovered", + "oldValue": "2026-04-10T19:32:59.960Z", + "newValue": None, + }, + {"key": "isDeployed", "oldValue": False, "newValue": True}, + ], + }, + { + "type": "MODIFY", + "timestamp": "2026-04-10T19:33:00.000Z", + "changes": [ + { + "key": "dateDeployed", + "oldValue": "2026-03-16T20:36:10.316Z", + "newValue": None, + }, + { + "key": "dateRecovered", + "oldValue": None, + "newValue": "2026-04-10T19:32:59.960Z", + }, + {"key": "isDeployed", "oldValue": True, "newValue": False}, + { + "key": "recoveredLatDeg", + "oldValue": None, + "newValue": 41.70440105394446, + }, + { + "key": "recoveredLonDeg", + "oldValue": None, + "newValue": -70.58701236527317, + }, + ], + }, + ], + } + + buoy = Buoy.parse_obj(buoy_data) + processor = EdgeTechProcessor(data=[buoy_data], er_token="token", er_url="url") + + # ER gear from the 19:05 deployment + device_id_a = f"{serial_number}_{hashed_user_id}_A" + mock_device = BuoyDevice( + device_id="existing-uuid", + mfr_device_id=device_id_a, + label="Device A", + location=DeviceLocation( + latitude=41.749452004389454, longitude=-70.7414179924815 + ), + last_updated=datetime(2026, 4, 13, 19, 5, 54, tzinfo=timezone.utc), + last_deployed=datetime(2026, 4, 13, 19, 5, 54, tzinfo=timezone.utc), + ) + mock_gear = BuoyGear( + id=uuid4(), + display_id="GEAR-1905", + status="deployed", + last_updated=datetime(2026, 4, 13, 19, 5, 54, tzinfo=timezone.utc), + devices=[mock_device], + type="single", + manufacturer="edgetech", + ) + + haul_payload = processor._create_haul_payload( + er_gear=mock_gear, edgetech_buoy=buoy, is_redeployment=True + ) + + haul_recorded_at = haul_payload["devices"][0]["recorded_at"] + + # Must be dateDeployed - 1s (2026-04-13T19:09:27), NOT the stale + # April 10 dateRecovered from the prior lifecycle + expected = processor._remove_milliseconds( + buoy.currentState.dateDeployed - timedelta(seconds=1) + ).isoformat() + stale_april_10 = "2026-04-10T19:32:59+00:00" + + assert haul_recorded_at != stale_april_10, ( + f"Haul recorded_at ({haul_recorded_at}) must NOT use the stale " + f"April 10 dateRecovered from a prior lifecycle" + ) + assert haul_recorded_at == expected, ( + f"Haul recorded_at ({haul_recorded_at}) should be dateDeployed - 1s " + f"({expected})" + ) + + # Recovery location should NOT come from the stale April 10 changeRecord; + # it should fall back to the ER device's deployed location + assert haul_payload["devices"][0]["location"]["latitude"] == 41.749452004389454 + assert haul_payload["devices"][0]["location"]["longitude"] == -70.7414179924815 + + @pytest.mark.asyncio + async def test_process_prefers_deployed_gear_over_hauled_for_same_device(self): + """When ER returns multiple gears sharing a mfr_device_id (e.g. a hauled + gear from a prior lifecycle plus a currently-deployed gear from the + latest re-deploy), the lookup must pick the deployed one. Otherwise the + hauled gear can win, _identify_buoys falls into the + ``status != 'deployed' and isDeployed`` branch, and a duplicate + gearset is created. + """ + user_id = "6846e8f6e0488a09f1d5b39a" + serial_number = "88CE99D358" + hashed_user_id = get_hashed_user_id(user_id) + + # EdgeTech currentState reflects the latest re-deploy; same dateDeployed + # as the deployed ER gear (no re-deployment, no update needed). + buoy_data = { + "serialNumber": serial_number, + "userId": user_id, + "currentState": { + "etag": '"1776336184788"', + "isDeleted": False, + "serialNumber": serial_number, + "releaseCommand": "C8AB8C7658", + "statusCommand": serial_number, + "idCommand": "CCCCCCCCCC", + "latDeg": 42.4476378, + "lonDeg": -70.608329, + "endLatDeg": 42.455983, + "endLonDeg": -70.616863, + "modelNumber": "5112", + "isDeployed": True, + "dateDeployed": "2026-04-16T10:43:04.182Z", + "isTwoUnitLine": False, + "lastUpdated": "2026-04-16T10:43:04.788Z", + }, + "changeRecords": [], + } + + processor = EdgeTechProcessor(data=[buoy_data], er_token="token", er_url="url") + + device_id_a = f"{serial_number}_{hashed_user_id}_A" + device_id_b = f"{serial_number}_{hashed_user_id}_B" + + # Hauled gear from a prior lifecycle (same device serials). + hauled_device_a = BuoyDevice( + device_id="old-uuid-a", + mfr_device_id=device_id_a, + label="Device A", + location=DeviceLocation(latitude=42.4487014, longitude=-70.6094034), + last_updated=datetime(2026, 4, 16, 10, 34, 35, tzinfo=timezone.utc), + last_deployed=datetime(2026, 4, 9, 14, 17, 56, tzinfo=timezone.utc), + ) + hauled_device_b = BuoyDevice( + device_id="old-uuid-b", + mfr_device_id=device_id_b, + label="Device B", + location=DeviceLocation(latitude=42.4557331, longitude=-70.6168107), + last_updated=datetime(2026, 4, 16, 10, 34, 35, tzinfo=timezone.utc), + last_deployed=datetime(2026, 4, 9, 14, 17, 56, tzinfo=timezone.utc), + ) + hauled_gear = BuoyGear( + id=uuid4(), + display_id="GEAR-APRIL-9", + status="hauled", + last_updated=datetime(2026, 4, 16, 10, 34, 35, tzinfo=timezone.utc), + devices=[hauled_device_a, hauled_device_b], + type="trawl", + manufacturer="edgetech", + ) + + # Currently-deployed gear from the April 16 re-deploy. + deployed_device_a = BuoyDevice( + device_id="new-uuid-a", + mfr_device_id=device_id_a, + label="Device A", + location=DeviceLocation(latitude=42.4476378, longitude=-70.608329), + last_updated=datetime(2026, 4, 16, 10, 43, 4, tzinfo=timezone.utc), + last_deployed=datetime(2026, 4, 16, 10, 43, 4, tzinfo=timezone.utc), + ) + deployed_device_b = BuoyDevice( + device_id="new-uuid-b", + mfr_device_id=device_id_b, + label="Device B", + location=DeviceLocation(latitude=42.455983, longitude=-70.616863), + last_updated=datetime(2026, 4, 16, 10, 43, 4, tzinfo=timezone.utc), + last_deployed=datetime(2026, 4, 16, 10, 43, 4, tzinfo=timezone.utc), + ) + deployed_gear = BuoyGear( + id=uuid4(), + display_id="GEAR-APRIL-16", + status="deployed", + last_updated=datetime(2026, 4, 16, 10, 43, 4, tzinfo=timezone.utc), + devices=[deployed_device_a, deployed_device_b], + type="trawl", + manufacturer="edgetech", + ) + + # Deployed gear listed first, hauled gear last — without the dedup + # logic, the hauled gear (last write wins in a dict comprehension) + # would clobber the deployed entry and trigger a duplicate deployment. + mock_er_client = Mock() + mock_er_client.get_er_gears = AsyncMock( + return_value=[deployed_gear, hauled_gear] + ) + mock_er_client.get_sources = AsyncMock(return_value=[]) + processor._er_client = mock_er_client + + gear_payloads = await processor.process() + + # No deployment payload should be generated; the deployed April 16 gear + # already matches the EdgeTech currentState. + deploy_payloads = [ + p + for p in gear_payloads + if any(d.get("device_status") == "deployed" for d in p.get("devices", [])) + and "initial_deployment_date" in p + ] + assert deploy_payloads == [], ( + f"Expected no new deployment, got {len(deploy_payloads)}: " + f"{deploy_payloads}" + ) + + def test_change_record_helpers_accept_naive_min_date(self): + """BuoyDevice.last_deployed is not coerced to tz-aware, so min_date + passed into the change-record helpers can be naive. The helpers must + normalize before comparing against the tz-aware recovered_dt — + otherwise Python raises TypeError on naive vs aware comparison.""" + buoy_data = { + "serialNumber": "88CE99D358", + "userId": "6846e8f6e0488a09f1d5b39a", + "currentState": { + "etag": '"x"', + "isDeleted": False, + "serialNumber": "88CE99D358", + "releaseCommand": "C8AB8C7658", + "statusCommand": "88CE99D358", + "idCommand": "CCCCCCCCCC", + "latDeg": 42.4476378, + "lonDeg": -70.608329, + "modelNumber": "5112", + "isDeployed": True, + "dateDeployed": "2026-04-16T10:43:04.182Z", + "lastUpdated": "2026-04-16T10:43:04.788Z", + }, + "changeRecords": [ + { + "type": "MODIFY", + "timestamp": "2026-04-16T10:34:35.000Z", + "changes": [ + { + "key": "dateRecovered", + "oldValue": None, + "newValue": "2026-04-16T09:54:07.949Z", + }, + { + "key": "recoveredLatDeg", + "oldValue": None, + "newValue": 42.4992445, + }, + { + "key": "recoveredLonDeg", + "oldValue": None, + "newValue": -70.8112983, + }, + ], + }, + ], + } + + buoy = Buoy.parse_obj(buoy_data) + # Naive datetime — simulates an ER device whose last_deployed wasn't + # coerced to tz-aware. + naive_min = datetime(2026, 4, 9, 14, 17, 56) + + # Should not raise TypeError + result = EdgeTechProcessor._get_date_recovered_from_change_records( + buoy, min_date=naive_min + ) + assert result == datetime(2026, 4, 16, 9, 54, 7, 949000, tzinfo=timezone.utc) + + lat, lon = EdgeTechProcessor._get_recovery_location_from_change_records( + buoy, min_date=naive_min + ) + assert lat == 42.4992445 + assert lon == -70.8112983