diff --git a/README.md b/README.md index 59cfc40d..c4678d53 100644 --- a/README.md +++ b/README.md @@ -263,6 +263,7 @@ EdgeTech supports buoy systems with two physical units connected by a line: - The start unit (`startUnit: null`) initiates processing - The end unit (`endUnit: null`) is skipped (processed as part of start unit) - If end unit is missing, the start unit is skipped with a warning +- **Circular two-unit protection**: If the same two devices are each configured as start with the other as end (e.g. A.endUnit=B and B.endUnit=A), only one gearset is created. The duplicate start is skipped (canonical lead is the device with the smaller serial number), and a warning is logged. --- @@ -358,13 +359,29 @@ After filtering, we compare EdgeTech data with our existing Earth Ranger records - Standard: `{serialNumber}_{hashedUserId}` - Action: Create deployment gear payload with new UUID as `set_id` -**2. UPDATE (Location Changes)** +**2. RE-DEPLOY (Same units, new deployment)** +- Buoy exists in **both** EdgeTech and Earth Ranger, and ER gear is still `status: "deployed"` +- EdgeTech **`dateDeployed`** is **more than 1 minute after** the ER gear’s deployment time (so we treat it as a new deployment, not an update) +- Action: **Haul** the existing gear (close it), then **Deploy** a new gear set with a new `set_id` +- **Processing order**: Haul payload is sent first, then the new deployment payload, so the previous gear is closed before the new one is created +- **Why**: When the same serial(s) are deployed again (e.g. same two-unit pair with a new `dateDeployed`), we close the previous deployment in ER/Buoy and create a new one instead of updating the old gear in place +- **Haul timestamp for re-deployments**: When a buoy is hauled and redeployed within seconds, the `currentState.dateRecovered` is cleared by the redeploy. In this case, the haul payload’s `recorded_at` is sourced from the most recent `dateRecovered` in `changeRecords` to avoid colliding with the deploy’s `recorded_at` (which uses `dateDeployed`). Recovery location (`recoveredLatDeg`/`recoveredLonDeg`) is also recovered from `changeRecords` in the same way. + +**2a. DEPLOY (Recovery from missed deployment)** +- Buoy exists in **both** EdgeTech and Earth Ranger, but ER gear `status` is **not** `"deployed"` (e.g. `"hauled"`) +- EdgeTech shows `isDeployed: true` +- **Why**: This handles the case where a previous re-deployment haul succeeded but the deploy failed (e.g. `recorded_at` collision). On subsequent processor runs, the hauled ER gear + deployed EdgeTech state is recognized as a missed deployment. +- Action: Create deployment gear payload with new UUID as `set_id` (no haul needed since gear is already hauled) + +**3. UPDATE (Location Changes)** - Buoy exists in both systems -- EdgeTech `lastUpdated` > Earth Ranger `last_updated`, OR location changed +- Location has changed (primary device coordinates differ from ER) - Buoy still marked as `isDeployed: true` and `isDeleted: false` -- Action: Create update gear payload using existing ER gear's `set_id` +- **Not** a re-deployment (EdgeTech `dateDeployed` is not more than 1 minute after ER gear’s deployment) +- **Note**: Buoys are initially identified for update when `lastUpdated > last_updated` OR location changed, but during payload generation, updates are only sent when the location has actually changed. Metadata-only updates (newer timestamp, same location) are skipped. +- Action: Create update gear payload using existing ER gear’s `set_id` -**3. HAUL (Retrievals)** +**4. HAUL (Retrievals)** - Buoy exists in both systems, but EdgeTech **explicitly** marks it as: - `isDeleted: true`, OR - `isDeployed: false` @@ -372,7 +389,7 @@ After filtering, we compare EdgeTech data with our existing Earth Ranger records - **Important**: Absence from EdgeTech data does NOT trigger a haul - Action: Create haul gear payload using existing ER gear's `set_id` -**4. NO-OP (Skip)** +**5. NO-OP (Skip)** - Buoy exists in both systems - No location change detected - Same `lastUpdated` timestamp @@ -420,6 +437,7 @@ er_gear = er_gears_devices_id_to_gear.get(primary_key) \ | Scenario | ER Gear Found? | set_id Source | |----------|----------------|---------------| | **New Deployment** | No | Generate new UUID: `str(uuid4())` | +| **Re-deployment** | Yes | Haul: use ER gear's `display_id`. Deploy: generate new UUID | | **Update Existing** | Yes | Use ER gear's `id` field (UUID) | | **Haul Existing** | Yes | Use ER gear's `display_id` field | @@ -859,8 +877,9 @@ When EdgeTech explicitly marks a buoy as `isDeleted: true` or `isDeployed: false ``` **Location Priority for Hauls**: -1. Recovery location from EdgeTech (`recoveredLatDeg`/`recoveredLonDeg`) if available -2. Fallback to last deployed location from Earth Ranger +1. Recovery location from EdgeTech `currentState` (`recoveredLatDeg`/`recoveredLonDeg`) if available +2. Recovery location from EdgeTech `changeRecords` (for re-deployments where `currentState` was overwritten) +3. Fallback to last deployed location from Earth Ranger **Note**: All devices in the gear set use the same recovery location since there's only one recovery point. @@ -956,21 +975,23 @@ Result: Entire system skipped if either unit missing ▼ ┌─────────────────────────────────────────────────────────────┐ │ 4. IDENTIFY OPERATIONS │ -│ DEPLOY: In EdgeTech (deployed), not in ER │ -│ UPDATE: In both, EdgeTech newer + location changed │ -│ HAUL: In both, EdgeTech isDeleted/!isDeployed │ -│ (Absence from EdgeTech does NOT trigger haul) │ +│ DEPLOY: In EdgeTech (deployed), not in ER │ +│ RE-DEPLOY: In both, EdgeTech dateDeployed > ER + 1 min │ +│ → Haul existing gear, then deploy new │ +│ RECOVERY: ER gear hauled but EdgeTech isDeployed=true │ +│ → Deploy new gear (haul already done) │ +│ UPDATE: In both, EdgeTech newer + location changed │ +│ HAUL: In both, EdgeTech isDeleted/!isDeployed │ +│ (Absence from EdgeTech does NOT trigger haul) │ └────────────────┬────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ -│ 5. GENERATE GEAR PAYLOADS │ -│ For each operation: │ -│ - Resolve set_id (new UUID or existing ER ID) │ -│ - Resolve device_id (new UUID or existing source ID) │ -│ - Extract locations │ -│ - Set device_status (deployed/hauled) │ -│ - Build gear payload JSON │ +│ 5. GENERATE GEAR PAYLOADS (order matters) │ +│ a) Hauls first (close existing gears) │ +│ b) Deployments (new gear sets) │ +│ c) Updates (location/status changes) │ +│ For each: set_id, device_id, locations, device_status │ └────────────────┬────────────────────────────────────────────┘ │ ▼ @@ -1113,6 +1134,8 @@ This integration provides robust synchronization between EdgeTech's Trap Tracker ✅ **Efficient database dump** mechanism for bulk data retrieval ✅ **Intelligent filtering** to process active and explicitly hauled buoys ✅ **Explicit status-based haul detection** (not inferred from absence) +✅ **Re-deployment handling**: when EdgeTech `dateDeployed` is meaningfully later than ER’s deployment, we close the previous gear and create a new one (hauls sent before new deployments). Haul timestamps and recovery locations are sourced from `changeRecords` when a rapid haul+redeploy clears `currentState`. +✅ **Recovery from missed deployments**: if a re-deployment haul succeeded but the deploy failed, subsequent runs detect the hauled ER gear vs deployed EdgeTech state and create the missing deployment ✅ **Set ID resolution** to correctly update existing vs create new gear sets ✅ **Support for complex systems** including two-unit lines ✅ **Standardized gear payload format** for the Buoy API diff --git a/app/actions/edgetech/processor.py b/app/actions/edgetech/processor.py index f908b922..bb72ddd0 100644 --- a/app/actions/edgetech/processor.py +++ b/app/actions/edgetech/processor.py @@ -5,7 +5,7 @@ from uuid import uuid4 from app.actions.buoy import BuoyClient -from app.actions.buoy.types import BuoyGear +from app.actions.buoy.types import BuoyDevice, BuoyGear from app.actions.edgetech.types import Buoy from app.actions.utils import get_hashed_user_id @@ -65,6 +65,7 @@ async def _create_gear_payload( device_status: str, manufacturer_id_to_source_id: Dict[str, str], end_unit_buoy: Optional[Buoy] = None, + end_unit_device_from_er: Optional[BuoyDevice] = None, set_id: Optional[str] = None, include_initial_deployment: bool = True, ) -> Dict[str, Any]: @@ -75,7 +76,10 @@ async def _create_gear_payload( buoy: The main Buoy object device_status: Status of the device (deployed/hauled) manufacturer_id_to_source_id: Mapping of manufacturer_id to source_id for existing sources - end_unit_buoy: Optional second buoy for two-unit lines + end_unit_buoy: Optional second buoy for two-unit lines (from EdgeTech sync window) + end_unit_device_from_er: Optional end-unit device from ER when end_unit_buoy is not + in the sync window; used for updates and deployments (e.g. re-deployments or + recovery deployments where the end unit wasn't in the EdgeTech sync window). set_id: Optional gear set ID (auto-generated if not provided) include_initial_deployment: Whether to include initial_deployment_date @@ -130,6 +134,34 @@ async def _create_gear_payload( ) secondary_device_additional_data = json.loads(end_unit_buoy.json()) secondary_device_additional_data.pop("changeRecords", None) + elif end_unit_device_from_er: + # End unit not in EdgeTech sync window (e.g. location-only update on start unit, + # or re-deployment where end unit wasn't updated in EdgeTech); + # use current state from ER so we can still send the payload. + secondary_device_id = end_unit_device_from_er.mfr_device_id + secondary_latitude = end_unit_device_from_er.location.latitude + secondary_longitude = end_unit_device_from_er.location.longitude + # For initial deployments (including re-deployments), use the start + # unit's deployment time so both devices reflect the new deployment. + # For updates, use the ER value (we're not re-deploying). + if include_initial_deployment: + secondary_last_deployed = last_deployed + else: + secondary_last_deployed = ( + end_unit_device_from_er.last_deployed or last_updated + ) + # Use same recorded_at as start unit so we don't resend an existing + # (device_id, recorded_at) pair and so both devices are consistent. + # ER-sourced last_updated is kept in device_additional_data for traceability. + secondary_recorded_at = deployment_recorded_at + secondary_device_additional_data = { + "serialNumber": secondary_device_id.split("_")[0], + "lastUpdated": ( + end_unit_device_from_er.last_updated.isoformat() + if end_unit_device_from_er.last_updated + else None + ), + } main_device = { "device_id": manufacturer_id_to_source_id.get(main_device_id) @@ -220,26 +252,53 @@ def _create_haul_payload( recovery_lon = None # Determine the recorded_at timestamp for the haul event - # Use dateRecovered if available, otherwise use lastUpdated, or current time + # Use dateRecovered if available; for re-deployments (haul then immediate + # redeploy) the currentState dateRecovered is cleared, so check changeRecords + # for the most recent dateRecovered value. Fall back to lastUpdated. haul_recorded_at = datetime.now(timezone.utc) if edgetech_buoy: if edgetech_buoy.currentState.dateRecovered: haul_recorded_at = edgetech_buoy.currentState.dateRecovered - elif edgetech_buoy.currentState.lastUpdated: - haul_recorded_at = edgetech_buoy.currentState.lastUpdated - - if ( - edgetech_buoy - and edgetech_buoy.currentState.recoveredLatDeg - and edgetech_buoy.currentState.recoveredLonDeg - ): - recovery_location_available = True - recovery_lat = edgetech_buoy.currentState.recoveredLatDeg - recovery_lon = edgetech_buoy.currentState.recoveredLonDeg - logger.info( - f"Using recovery location from EdgeTech for gear {er_gear.display_id}: " - f"({recovery_lat}, {recovery_lon})" - ) + else: + # Look for dateRecovered in changeRecords (re-deployment: currentState + # was overwritten by the redeploy, but the haul timestamp is preserved + # in the change history) + recovered_at_from_changes = ( + self._get_date_recovered_from_change_records(edgetech_buoy) + ) + if recovered_at_from_changes: + haul_recorded_at = recovered_at_from_changes + logger.info( + f"Using dateRecovered from changeRecords for haul of " + f"{edgetech_buoy.serialNumber}: {recovered_at_from_changes}" + ) + elif edgetech_buoy.currentState.lastUpdated: + haul_recorded_at = edgetech_buoy.currentState.lastUpdated + + if edgetech_buoy: + if ( + edgetech_buoy.currentState.recoveredLatDeg is not None + and edgetech_buoy.currentState.recoveredLonDeg is not None + ): + recovery_location_available = True + recovery_lat = edgetech_buoy.currentState.recoveredLatDeg + recovery_lon = edgetech_buoy.currentState.recoveredLonDeg + else: + # Re-deployment: recovery location may be in changeRecords + # (currentState was overwritten by the redeploy) + rec_lat, rec_lon = self._get_recovery_location_from_change_records( + edgetech_buoy + ) + if rec_lat is not None and rec_lon is not None: + recovery_location_available = True + recovery_lat = rec_lat + recovery_lon = rec_lon + + if recovery_location_available: + logger.info( + f"Using recovery location for gear {er_gear.display_id}: " + f"({recovery_lat}, {recovery_lon})" + ) for device in er_gear.devices: # Use recovery location if available, otherwise use deployed location from ER @@ -304,6 +363,68 @@ def _is_hauled_or_recovered(self, record: Buoy) -> bool: or record.currentState.dateRecovered is not None ) + @staticmethod + def _get_date_recovered_from_change_records(buoy: Buoy) -> Optional[datetime]: + """ + Find the most recent dateRecovered value from a buoy's changeRecords. + + This is needed for re-deployment scenarios where the buoy was hauled and + immediately redeployed — the currentState no longer has dateRecovered + (cleared by the redeploy), but the changeRecords preserve the haul timestamp. + + Returns: + The most recent dateRecovered datetime, or None if not found. + """ + most_recent = None + for record in buoy.changeRecords: + for change in record.changes: + if change.key == "dateRecovered" and change.newValue is not None: + try: + recovered_dt = datetime.fromisoformat( + str(change.newValue).replace("Z", "+00:00") + ) + if most_recent is None or recovered_dt > most_recent: + most_recent = recovered_dt + except (ValueError, TypeError): + continue + return most_recent + + @staticmethod + def _get_recovery_location_from_change_records( + buoy: Buoy, + ) -> Tuple[Optional[float], Optional[float]]: + """ + Find the most recent recovery location from a buoy's changeRecords. + + Needed for re-deployment scenarios where recoveredLatDeg/recoveredLonDeg + in currentState were cleared by the redeploy. + + Returns: + Tuple of (latitude, longitude) or (None, None) if not found. + """ + # Find the changeRecord that set dateRecovered most recently — + # its sibling entries will have the recovery coordinates. + best_timestamp = None + best_lat = None + best_lon = None + for record in buoy.changeRecords: + lat = None + lon = None + has_recovery = False + for change in record.changes: + if change.key == "dateRecovered" and change.newValue is not None: + has_recovery = True + elif change.key == "recoveredLatDeg" and change.newValue is not None: + lat = change.newValue + elif change.key == "recoveredLonDeg" and change.newValue is not None: + lon = change.newValue + if has_recovery and lat is not None and lon is not None: + if best_timestamp is None or record.timestamp > best_timestamp: + best_timestamp = record.timestamp + best_lat = lat + best_lon = lon + return best_lat, best_lon + def _should_skip_buoy(self, record: Buoy) -> Tuple[bool, Optional[str]]: """ Determine if a buoy record should be skipped during processing. @@ -408,9 +529,11 @@ async def _identify_buoys( if a haul event should be generated, rather than inferring hauling from absence in the dataset. Process: - - `to_deploy`: Buoys that are deployed in EdgeTech but not yet in ER. + - `to_deploy`: Buoys that are deployed in EdgeTech but not yet in ER, or re-deployments + (EdgeTech dateDeployed newer than ER gear's deployment: haul existing then deploy new). - `to_update`: Buoys that exist in ER and have changes (location, status, or newer data). - - `to_haul`: Buoys that are explicitly marked as deleted or not deployed in EdgeTech. + - `to_haul`: Buoys that are explicitly marked as deleted or not deployed in EdgeTech, or re-deployments + (close existing gear before new deployment). Args: er_gears_devices_id_to_gear (Dict[str, BuoyGear]): Mapping of ER device IDs to gear objects. @@ -477,8 +600,54 @@ async def _identify_buoys( logger.info( f"Buoy {serial_number_user_id} already hauled in ER, skipping" ) + elif ( + er_gear.status != "deployed" + and edgetech_buoy.currentState.isDeployed + ): + # ER gear was hauled (e.g. previous re-deployment haul succeeded + # but the deploy failed), and EdgeTech shows it as deployed now. + # Treat as a new deployment so the gear gets created in ER. + to_deploy.add(serial_number_user_id) + logger.info( + f"Buoy {serial_number_user_id} marked for deployment " + f"(ER gear status={er_gear.status} but EdgeTech isDeployed=True; " + f"recovering missed deployment)" + ) else: - # Buoy is still deployed - check if it needs updating + # Buoy is still deployed - check for re-deployment first (new deployment + # with same serials: close previous gear and create new one) + er_gear_deployment_dates = [ + d.last_deployed + for d in er_gear.devices + if isinstance(getattr(d, "last_deployed", None), datetime) + ] + er_gear_deployment_date = ( + min(er_gear_deployment_dates) + if er_gear_deployment_dates + else None + ) + edgetech_date_deployed = edgetech_buoy.currentState.dateDeployed + # Re-deployment: EdgeTech dateDeployed is meaningfully later than ER's + # (e.g. same units deployed again). Use 1-minute threshold to avoid + # treating microsecond/timestamp precision differences as re-deploy. + is_redeployment = ( + er_gear_deployment_date is not None + and edgetech_date_deployed is not None + and (edgetech_date_deployed - er_gear_deployment_date) + > timedelta(minutes=1) + and er_gear.status == "deployed" + ) + if is_redeployment: + to_haul.add(serial_number_user_id) + to_deploy.add(serial_number_user_id) + logger.info( + f"Buoy {serial_number_user_id} marked for re-deployment " + f"(EdgeTech dateDeployed={edgetech_date_deployed} > ER gear deployment " + f"{er_gear_deployment_date}); will haul existing gear then deploy new." + ) + continue + + # Check if it needs updating (location or newer data) edgetech_buoy_current_location = ( edgetech_buoy.currentState.latDeg, edgetech_buoy.currentState.lonDeg, @@ -538,6 +707,57 @@ async def _identify_buoys( return to_deploy, to_haul, to_update + def _get_circular_two_unit_start_keys_to_skip( + self, serial_number_to_edgetech_buoy: Dict[str, Buoy] + ) -> Set[str]: + """ + Detect circular two-unit line configs: same two devices each configured as + start unit with the other as end (e.g. A.endUnit=B and B.endUnit=A). That + would create two gearsets for one physical pair. We keep one canonical + start (lower serial number) and skip the other. + + Returns: + Set of buoy keys (serialNumber/hashedUserId) to skip when processing + deployments/updates for two-unit lines. + """ + skip_keys: Set[str] = set() + seen_pairs: Set[Tuple[str, str]] = set() # (min_serial, max_serial) for dedup + + for buoy in serial_number_to_edgetech_buoy.values(): + if not buoy.currentState.isTwoUnitLine or not buoy.currentState.endUnit: + continue + if buoy.currentState.startUnit: + continue # This is an end-unit record, not a start + partner_serial = buoy.currentState.endUnit + hashed_user_id = get_hashed_user_id(buoy.userId) + partner_key = f"{partner_serial}/{hashed_user_id}" + partner = serial_number_to_edgetech_buoy.get(partner_key) + if not partner or not partner.currentState.isTwoUnitLine: + continue + # Circular: partner also has endUnit pointing back to this buoy (so both are "start") + if partner.currentState.endUnit != buoy.serialNumber: + continue + if partner.currentState.startUnit: + continue # Partner is end unit, not start; no conflict + pair = tuple(sorted([buoy.serialNumber, partner_serial])) + if pair in seen_pairs: + continue + seen_pairs.add(pair) + # Skip the one with the larger serial so we have a single canonical gearset + skip_serial = max(buoy.serialNumber, partner_serial) + skip_key = f"{skip_serial}/{hashed_user_id}" + skip_keys.add(skip_key) + logger.warning( + "Circular two-unit line detected: devices %s and %s each configured as " + "start with the other as end. Skipping duplicate start for %s (canonical lead: %s). " + "Only one gearset will be created for this pair.", + buoy.serialNumber, + partner_serial, + skip_serial, + min(buoy.serialNumber, partner_serial), + ) + return skip_keys + async def process(self) -> List[Dict[str, Any]]: """ Process buoy data to generate gear payloads for the Buoy API. @@ -551,11 +771,11 @@ async def process(self) -> List[Dict[str, Any]]: 2. Fetches existing ER gears and creates mappings for efficient lookups. 3. Fetches all sources once for manufacturer ID to source ID mapping. 4. Categorizes buoys based on explicit status checks: - - Deploy: Buoys marked as deployed in EdgeTech but not yet in ER. + - Deploy: Buoys marked as deployed in EdgeTech but not yet in ER, or re-deployments. - Update: Buoys in ER with location changes or newer data from EdgeTech. - - Haul: Buoys explicitly marked as deleted or not deployed in EdgeTech - while still showing as deployed in ER. - 5. Creates gear payloads directly for each operation. + - Haul: Buoys explicitly marked as deleted or not deployed in EdgeTech, or re-deployments + (close existing gear so a new deployment can be created). + 5. Creates gear payloads: hauls first (close old gear), then deployments, then updates. Important: Absence of a buoy from the sync window does NOT imply it was hauled. Haul events are only generated when EdgeTech explicitly marks a buoy as deleted @@ -583,6 +803,11 @@ async def process(self) -> List[Dict[str, Any]]: f"Processing {len(serial_number_to_edgetech_buoy)} buoys from EdgeTech sync window" ) + # Detect circular two-unit configs (same pair each as start) and skip duplicate + circular_two_unit_skip_keys = self._get_circular_two_unit_start_keys_to_skip( + serial_number_to_edgetech_buoy + ) + # Fetch all sources once and create a mapping for efficient lookups logger.info("Fetching all sources from Buoy API...") sources = await self._er_client.get_sources(params={"page_size": 10000}) @@ -608,6 +833,58 @@ async def process(self) -> List[Dict[str, Any]]: gear_payloads = [] + # Process hauls first (so re-deployments close old gear before creating new) + haul_gears_processed = set() + + for serial_number_user_id in to_haul: + serial_number, hashed_user_id = serial_number_user_id.split("/", 2) + primary_device_name = f"{serial_number}_{hashed_user_id}_A" + single_device_name = f"{serial_number}_{hashed_user_id}" + + # Get the EdgeTech buoy data for potential recovery location + edgetech_buoy = serial_number_to_edgetech_buoy.get(serial_number_user_id) + + # Find the corresponding ER gear + er_gear = er_gears_devices_id_to_gear.get( + primary_device_name + ) or er_gears_devices_id_to_gear.get(single_device_name) + + if not er_gear: + logger.warning( + "No ER gear found for buoy %s (tried %s and %s), skipping haul.", + serial_number_user_id, + primary_device_name, + single_device_name, + ) + continue + + # Skip if we already processed this gear set + if er_gear.display_id in haul_gears_processed: + logger.info( + f"Gear set {er_gear.display_id} already processed for haul, " + f"skipping buoy {serial_number_user_id}" + ) + continue + + try: + payload = self._create_haul_payload( + er_gear=er_gear, edgetech_buoy=edgetech_buoy + ) + gear_payloads.append(payload) + haul_gears_processed.add(er_gear.display_id) + logger.info( + f"Created haul payload for gear set {er_gear.display_id} " + f"(buoy {serial_number_user_id})" + ) + + except Exception as e: + logger.exception( + "Failed to create haul payload for gear set %s (buoy %s). Error: %s", + er_gear.display_id if er_gear else "unknown", + serial_number_user_id, + str(e), + ) + # Process deployments (new gear sets) for serial_number_user_id in to_deploy: edgetech_buoy = serial_number_to_edgetech_buoy[serial_number_user_id] @@ -615,6 +892,7 @@ async def process(self) -> List[Dict[str, Any]]: try: # Get end unit buoy if this is a two-unit line end_unit_buoy = None + end_unit_device_from_er = None if ( edgetech_buoy.currentState.isTwoUnitLine and edgetech_buoy.currentState.endUnit @@ -625,22 +903,58 @@ async def process(self) -> List[Dict[str, Any]]: ) if not end_unit_buoy: - logger.warning( - "End unit buoy %s not found for serial number %s, skipping deployment.", - edgetech_buoy.currentState.endUnit, - serial_number_user_id, + # End unit not in sync window. Try to get it from ER + # (covers re-deployments where haul already happened, + # and recovery deployments where ER gear was hauled). + end_unit_mfr_id = ( + f"{edgetech_buoy.currentState.endUnit}" + f"_{get_hashed_user_id(edgetech_buoy.userId)}" ) - continue + er_gear = er_gears_devices_id_to_gear.get( + f"{serial_number_user_id.replace('/', '_')}_A" + ) or er_gears_devices_id_to_gear.get( + serial_number_user_id.replace("/", "_") + ) + if er_gear: + for er_device in er_gear.devices: + if er_device.mfr_device_id == end_unit_mfr_id: + end_unit_device_from_er = er_device + break + if end_unit_device_from_er: + logger.info( + "End unit %s not in sync window; " + "using ER state for deployment of %s", + edgetech_buoy.currentState.endUnit, + serial_number_user_id, + ) + else: + logger.warning( + "End unit buoy %s not found for %s, " + "skipping deployment.", + edgetech_buoy.currentState.endUnit, + serial_number_user_id, + ) + continue if edgetech_buoy.currentState.startUnit: # This record is for the end unit, skip it (will be handled by start unit) continue + if serial_number_user_id in circular_two_unit_skip_keys: + logger.warning( + "Skipping deployment for %s (circular two-unit duplicate).", + serial_number_user_id, + ) + continue + payload = await self._create_gear_payload( buoy=edgetech_buoy, device_status="deployed", manufacturer_id_to_source_id=manufacturer_id_to_source_id, end_unit_buoy=end_unit_buoy, + end_unit_device_from_er=( + end_unit_device_from_er if not end_unit_buoy else None + ), include_initial_deployment=True, ) gear_payloads.append(payload) @@ -697,6 +1011,7 @@ async def process(self) -> List[Dict[str, Any]]: try: # Get end unit buoy if this is a two-unit line end_unit_buoy = None + end_unit_device_from_er = None if edgetech_buoy.currentState.isTwoUnitLine: if edgetech_buoy.currentState.endUnit: end_unit_buoy_key = f"{edgetech_buoy.currentState.endUnit}/{get_hashed_user_id(edgetech_buoy.userId)}" @@ -705,22 +1020,44 @@ async def process(self) -> List[Dict[str, Any]]: ) if not end_unit_buoy: - logger.warning( - "End unit buoy %s not found for serial number %s, skipping update.", + # End unit not in sync window (e.g. only start unit had location update). + # Use end unit's current state from ER so we can still push the update. + # Match by mfr_device_id: same format as when we create the payload. + end_unit_mfr_id = f"{edgetech_buoy.currentState.endUnit}_{get_hashed_user_id(edgetech_buoy.userId)}" + for er_device in er_gear.devices: + if er_device.mfr_device_id == end_unit_mfr_id: + end_unit_device_from_er = er_device + break + if not end_unit_device_from_er: + logger.warning( + "End unit buoy %s not found and no end unit device in ER gear for %s, skipping update.", + edgetech_buoy.currentState.endUnit, + serial_number_user_id, + ) + continue + logger.info( + "End unit %s not in sync window; using current state from ER for update of %s", edgetech_buoy.currentState.endUnit, serial_number_user_id, ) - continue if edgetech_buoy.currentState.startUnit: # This record is for the end unit, skip it continue + if serial_number_user_id in circular_two_unit_skip_keys: + logger.warning( + "Skipping update for %s (circular two-unit duplicate).", + serial_number_user_id, + ) + continue + payload = await self._create_gear_payload( buoy=edgetech_buoy, device_status="deployed", manufacturer_id_to_source_id=manufacturer_id_to_source_id, end_unit_buoy=end_unit_buoy, + end_unit_device_from_er=end_unit_device_from_er, set_id=er_gear.id, include_initial_deployment=False, ) @@ -734,59 +1071,6 @@ async def process(self) -> List[Dict[str, Any]]: str(e), ) - # Process hauls (gear sets explicitly marked as deleted or not deployed) - # Group devices by gear set to avoid duplicate haul payloads - haul_gears_processed = set() - - for serial_number_user_id in to_haul: - serial_number, hashed_user_id = serial_number_user_id.split("/", 2) - primary_device_name = f"{serial_number}_{hashed_user_id}_A" - single_device_name = f"{serial_number}_{hashed_user_id}" - - # Get the EdgeTech buoy data for potential recovery location - edgetech_buoy = serial_number_to_edgetech_buoy.get(serial_number_user_id) - - # Find the corresponding ER gear - er_gear = er_gears_devices_id_to_gear.get( - primary_device_name - ) or er_gears_devices_id_to_gear.get(single_device_name) - - if not er_gear: - logger.warning( - "No ER gear found for buoy %s (tried %s and %s), skipping haul.", - serial_number_user_id, - primary_device_name, - single_device_name, - ) - continue - - # Skip if we already processed this gear set - if er_gear.display_id in haul_gears_processed: - logger.info( - f"Gear set {er_gear.display_id} already processed for haul, " - f"skipping buoy {serial_number_user_id}" - ) - continue - - try: - payload = self._create_haul_payload( - er_gear=er_gear, edgetech_buoy=edgetech_buoy - ) - gear_payloads.append(payload) - haul_gears_processed.add(er_gear.display_id) - logger.info( - f"Created haul payload for gear set {er_gear.display_id} " - f"(buoy {serial_number_user_id})" - ) - - except Exception as e: - logger.exception( - "Failed to create haul payload for gear set %s (buoy %s). Error: %s", - er_gear.display_id if er_gear else "unknown", - serial_number_user_id, - str(e), - ) - logger.info( "Generated %d gear payload(s):\n%s", len(gear_payloads), diff --git a/app/actions/tests/test_edgetech_processor.py b/app/actions/tests/test_edgetech_processor.py index c122da05..9e1e5258 100644 --- a/app/actions/tests/test_edgetech_processor.py +++ b/app/actions/tests/test_edgetech_processor.py @@ -5,7 +5,6 @@ import pydantic import pytest -from freezegun import freeze_time from app.actions.buoy.types import BuoyDevice, BuoyGear, DeviceLocation from app.actions.edgetech.processor import EdgeTechProcessor @@ -27,7 +26,9 @@ async def test_process_new_edgetech_trawl(mocker, a_new_edgetech_trawl_record): mock_er_client = mocker.MagicMock() mock_er_client.get_er_gears = AsyncMock(return_value=[]) mock_er_client.get_sources = AsyncMock(return_value=[]) - mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock(return_value=None) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) processor._er_client = mock_er_client # Act & Assert - The process should complete without errors @@ -199,7 +200,9 @@ def test_should_not_skip_valid_buoy(self, a_new_edgetech_trawl_record): assert should_skip is False assert reason is None - def test_should_not_skip_hauled_buoy_without_recovery_location(self, hauled_buoy_no_recovery_location): + def test_should_not_skip_hauled_buoy_without_recovery_location( + self, hauled_buoy_no_recovery_location + ): """Test that hauled buoys without recovery location are NOT skipped (bug fix).""" processor = EdgeTechProcessor(data=[], er_token="token", er_url="url") @@ -211,7 +214,12 @@ def test_should_not_skip_hauled_buoy_without_recovery_location(self, hauled_buoy assert should_skip is False assert reason is None - def test_is_hauled_or_recovered(self, hauled_buoy_no_recovery_location, deleted_buoy_record, non_deployed_buoy_record): + def test_is_hauled_or_recovered( + self, + hauled_buoy_no_recovery_location, + deleted_buoy_record, + non_deployed_buoy_record, + ): """Test the _is_hauled_or_recovered helper method.""" processor = EdgeTechProcessor(data=[], er_token="token", er_url="url") @@ -227,7 +235,9 @@ def test_is_hauled_or_recovered(self, hauled_buoy_no_recovery_location, deleted_ non_deployed_buoy = Buoy.parse_obj(non_deployed_buoy_record) assert processor._is_hauled_or_recovered(non_deployed_buoy) is True - def test_create_haul_payload_without_recovery_location(self, hauled_buoy_no_recovery_location): + def test_create_haul_payload_without_recovery_location( + self, hauled_buoy_no_recovery_location + ): """Test that haul payload uses fallback location when recovery location is not available.""" processor = EdgeTechProcessor(data=[], er_token="token", er_url="url") @@ -236,7 +246,9 @@ def test_create_haul_payload_without_recovery_location(self, hauled_buoy_no_reco device_id="source_id_123", mfr_device_id="HAUL123_hashed_user_A", label="Test Device", - location=DeviceLocation(latitude=40.7128, longitude=-74.0060), # Last known deployed location + location=DeviceLocation( + latitude=40.7128, longitude=-74.0060 + ), # Last known deployed location last_updated=datetime.now(timezone.utc), last_deployed=datetime.now(timezone.utc) - timedelta(days=1), ) @@ -253,7 +265,9 @@ def test_create_haul_payload_without_recovery_location(self, hauled_buoy_no_reco # Create haul payload with EdgeTech buoy that has no recovery location edgetech_buoy = Buoy.parse_obj(hauled_buoy_no_recovery_location) - payload = processor._create_haul_payload(er_gear=mock_gear, edgetech_buoy=edgetech_buoy) + payload = processor._create_haul_payload( + er_gear=mock_gear, edgetech_buoy=edgetech_buoy + ) # Verify payload structure assert payload["set_id"] == "GEAR789" @@ -316,7 +330,9 @@ def test_create_haul_payload_with_recovery_location(self): ) edgetech_buoy = Buoy.parse_obj(hauled_buoy_with_recovery) - payload = processor._create_haul_payload(er_gear=mock_gear, edgetech_buoy=edgetech_buoy) + payload = processor._create_haul_payload( + er_gear=mock_gear, edgetech_buoy=edgetech_buoy + ) # Verify device uses recovery location (not deployed location) device = payload["devices"][0] @@ -324,7 +340,6 @@ def test_create_haul_payload_with_recovery_location(self): assert device["location"]["latitude"] == 41.0 assert device["location"]["longitude"] == -73.0 - @pytest.mark.asyncio async def test_filter_edgetech_buoys_data_filters_out_invalid( self, @@ -353,8 +368,8 @@ async def test_filter_edgetech_buoys_data_filters_out_invalid( assert len(filtered_data) == 3 serial_numbers = [buoy.serialNumber for buoy in filtered_data] assert "8899CEDAAA" in serial_numbers # valid deployed - assert "DEL123" in serial_numbers # deleted (kept for haul detection) - assert "NDEP123" in serial_numbers # non-deployed (kept for haul detection) + assert "DEL123" in serial_numbers # deleted (kept for haul detection) + assert "NDEP123" in serial_numbers # non-deployed (kept for haul detection) assert "NOLOC123" not in serial_numbers # no location (filtered out) # Should have logged warning only for no-location record @@ -416,23 +431,27 @@ async def test_identify_buoys_update_existing_buoy( data = [a_new_edgetech_trawl_record] processor = EdgeTechProcessor(data=data, er_token="token", er_url="url") - # Mock existing ER gear with older timestamp - older_time = datetime(2025, 5, 25, 10, 0, 0, tzinfo=timezone.utc) + # Mock existing ER gear: last_deployed at or after EdgeTech dateDeployed so this + # is an update (same deployment), not a re-deployment; last_updated before + # EdgeTech lastUpdated so we have newer data and get to_update. + # Fixture has dateDeployed 2025-05-25 17:53:19.517, lastUpdated 17:53:19.731 + er_deployed = datetime(2025, 5, 25, 17, 53, 20, tzinfo=timezone.utc) + er_updated = datetime(2025, 5, 25, 17, 53, 19, tzinfo=timezone.utc) mock_device = BuoyDevice( device_id="8899CEDAAA_n9JpP3kk8vFVyNlzMnYZig9DnO475ztWV5JQ4z3RHwO19GPjN9sL8qDw8YgW_A", mfr_device_id="8899CEDAAA_n9JpP3kk8vFVyNlzMnYZig9DnO475ztWV5JQ4z3RHwO19GPjN9sL8qDw8YgW_A", label="Test Device", location=DeviceLocation(latitude=44.358265, longitude=-68.16757), - last_updated=older_time, - last_deployed=older_time, + last_updated=er_updated, + last_deployed=er_deployed, ) mock_gear = BuoyGear( id=uuid4(), display_id="GEAR123", status="deployed", - last_updated=older_time, + last_updated=er_updated, devices=[mock_device], type="ropeless", manufacturer="edgetech", @@ -462,15 +481,19 @@ async def test_identify_buoys_update_existing_buoy( ) @pytest.mark.asyncio - async def test_identify_buoys_haul_deleted_buoy(self, mocker, a_new_edgetech_trawl_record): + async def test_identify_buoys_haul_deleted_buoy( + self, mocker, a_new_edgetech_trawl_record + ): """Test that buoys explicitly marked as deleted in EdgeTech are identified for hauling.""" # Create a deleted buoy record deleted_record = a_new_edgetech_trawl_record.copy() deleted_record["currentState"] = deleted_record["currentState"].copy() deleted_record["currentState"]["isDeleted"] = True deleted_record["serialNumber"] = "DELETED123" - - processor = EdgeTechProcessor(data=[deleted_record], er_token="token", er_url="url") + + processor = EdgeTechProcessor( + data=[deleted_record], er_token="token", er_url="url" + ) # Mock existing ER gear that is still deployed mock_device = BuoyDevice( @@ -497,7 +520,9 @@ async def test_identify_buoys_haul_deleted_buoy(self, mocker, a_new_edgetech_tra } serial_number_to_edgetech_buoy = { - "DELETED123/n9JpP3kk8vFVyNlzMnYZig9DnO475ztWV5JQ4z3RHwO19GPjN9sL8qDw8YgW": processor._data[0] + "DELETED123/n9JpP3kk8vFVyNlzMnYZig9DnO475ztWV5JQ4z3RHwO19GPjN9sL8qDw8YgW": processor._data[ + 0 + ] } to_deploy, to_haul, to_update = await processor._identify_buoys( @@ -507,7 +532,10 @@ async def test_identify_buoys_haul_deleted_buoy(self, mocker, a_new_edgetech_tra assert len(to_deploy) == 0 assert len(to_haul) == 1 assert len(to_update) == 0 - assert "DELETED123/n9JpP3kk8vFVyNlzMnYZig9DnO475ztWV5JQ4z3RHwO19GPjN9sL8qDw8YgW" in to_haul + assert ( + "DELETED123/n9JpP3kk8vFVyNlzMnYZig9DnO475ztWV5JQ4z3RHwO19GPjN9sL8qDw8YgW" + in to_haul + ) @pytest.mark.asyncio async def test_identify_buoys_no_haul_for_missing_buoy(self, mocker): @@ -564,7 +592,9 @@ async def test_process_with_two_unit_line_missing_end_unit( mock_er_client = mocker.MagicMock() mock_er_client.get_er_gears = AsyncMock(return_value=[]) mock_er_client.get_sources = AsyncMock(return_value=[]) - mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock(return_value=None) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) processor._er_client = mock_er_client with caplog.at_level(logging.WARNING): @@ -601,7 +631,9 @@ async def test_process_with_end_unit_buoy( mock_er_client = mocker.MagicMock() mock_er_client.get_er_gears = AsyncMock(return_value=[]) mock_er_client.get_sources = AsyncMock(return_value=[]) - mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock(return_value=None) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) processor._er_client = mock_er_client # Act & Assert - The process should complete without errors @@ -652,7 +684,9 @@ async def test_process_updates_existing_buoy( mock_er_client = mocker.MagicMock() mock_er_client.get_er_gears = AsyncMock(return_value=[mock_gear]) mock_er_client.get_sources = AsyncMock(return_value=[]) - mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock(return_value=None) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) processor._er_client = mock_er_client # Act @@ -690,7 +724,9 @@ async def test_process_creates_haul_observations(self, mocker): mock_er_client = mocker.MagicMock() mock_er_client.get_er_gears = AsyncMock(return_value=[mock_gear]) mock_er_client.get_sources = AsyncMock(return_value=[]) - mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock(return_value=None) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) processor._er_client = mock_er_client # Act @@ -716,7 +752,7 @@ async def test_process_deploy_validation_error( with caplog.at_level(logging.ERROR): with pytest.raises(Exception, match="Test error accessing sources"): - gear_payloads = await processor.process() + await processor.process() @pytest.mark.asyncio async def test_process_update_missing_end_unit( @@ -737,7 +773,9 @@ async def test_process_update_missing_end_unit( device_id="8899CEDAAA_n9JpP3kk8vFVyNlzMnYZig9DnO475ztWV5JQ4z3RHwO19GPjN9sL8qDw8YgW_A", mfr_device_id="8899CEDAAA_n9JpP3kk8vFVyNlzMnYZig9DnO475ztWV5JQ4z3RHwO19GPjN9sL8qDw8YgW_A", label="Test Device", - location=DeviceLocation(latitude=44.0, longitude=-68.0), # Different location + location=DeviceLocation( + latitude=44.0, longitude=-68.0 + ), # Different location last_updated=datetime(2025, 5, 20, 10, 0, 0, tzinfo=timezone.utc), last_deployed=datetime(2025, 5, 20, 10, 0, 0, tzinfo=timezone.utc), ) @@ -755,15 +793,125 @@ async def test_process_update_missing_end_unit( mock_er_client = mocker.MagicMock() mock_er_client.get_er_gears = AsyncMock(return_value=[mock_gear]) mock_er_client.get_sources = AsyncMock(return_value=[]) - mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock(return_value=None) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) processor._er_client = mock_er_client with caplog.at_level(logging.WARNING): - observations = await processor.process() + await processor.process() - # Should log warning about missing end unit during update + # Should log warning about missing end unit during update (and no end unit in ER gear) assert "End unit buoy MISSING_END_UNIT not found" in caplog.text - # Note: may still generate haul observations + + @pytest.mark.asyncio + async def test_process_update_two_unit_uses_er_when_end_unit_not_in_sync_window( + self, mocker, caplog + ): + """ + When only the start unit has a location update (in sync window) and the end unit + is not in the sync window, we should still send the update using the end unit's + current state from ER (fixes location updates not appearing in ER/Buoy). + """ + user_id = "5f455a89e7ef8c0068db9ae1" + hashed = get_hashed_user_id(user_id) + start_serial = "88CE99B71C" + end_serial = "88CE99CAE8" + + # Only start unit in "sync window" - new location at 15:01:58 + start_unit_record = { + "serialNumber": start_serial, + "userId": user_id, + "currentState": { + "etag": '"1771167718663"', + "isDeleted": False, + "positionSetByCapri": False, + "serialNumber": start_serial, + "releaseCommand": "C8AB8CEA9C", + "statusCommand": start_serial, + "idCommand": "CCCCCCCCCC", + "isNfcTag": False, + "latDeg": 40.35775, + "lonDeg": -70.96013333333333, + "modelNumber": "5112", + "isDeployed": True, + "dateDeployed": "2026-02-15T14:56:47.660Z", + "isTwoUnitLine": True, + "endUnit": end_serial, + "licenseNumber": "330901", + "lastUpdated": "2026-02-15T15:01:58.663Z", + }, + "changeRecords": [], + } + + data = [start_unit_record] + processor = EdgeTechProcessor(data=data, er_token="token", er_url="url") + + # ER gear has both devices (start + end); start has old location to trigger update + start_device_id = f"{start_serial}_{hashed}_A" + end_device_id = f"{end_serial}_{hashed}" + start_device_er = BuoyDevice( + device_id=start_device_id, + mfr_device_id=start_device_id, + label="A", + location=DeviceLocation(latitude=40.3575109, longitude=-70.9632526), + last_updated=datetime(2026, 2, 15, 14, 56, 48, tzinfo=timezone.utc), + last_deployed=datetime(2026, 2, 15, 14, 56, 47, tzinfo=timezone.utc), + ) + end_device_er = BuoyDevice( + device_id=end_device_id, + mfr_device_id=end_device_id, + label="B", + location=DeviceLocation(latitude=40.358, longitude=-70.959), + last_updated=datetime(2026, 2, 15, 14, 56, 48, tzinfo=timezone.utc), + last_deployed=datetime(2026, 2, 15, 14, 56, 47, tzinfo=timezone.utc), + ) + mock_gear = BuoyGear( + id=uuid4(), + display_id="GEAR123", + status="deployed", + last_updated=datetime(2026, 2, 15, 14, 56, 48, tzinfo=timezone.utc), + devices=[start_device_er, end_device_er], + type="trawl", + manufacturer="edgetech", + ) + + mock_er_client = mocker.MagicMock() + mock_er_client.get_er_gears = AsyncMock(return_value=[mock_gear]) + mock_er_client.get_sources = AsyncMock(return_value=[]) + mock_er_client.send_gear_to_buoy_api = AsyncMock( + return_value={"status": "success", "status_code": 200} + ) + processor._er_client = mock_er_client + + with caplog.at_level(logging.INFO): + payloads = await processor.process() + + assert len(payloads) == 1 + payload = payloads[0] + assert payload["devices_in_set"] == 2 + devices_by_mfr = {d["mfr_device_id"]: d for d in payload["devices"]} + # When end unit comes from ER, start unit is sent without _A suffix + start_in_payload = ( + start_device_id.replace("_A", "") in devices_by_mfr + or start_device_id in devices_by_mfr + ) + assert ( + start_in_payload + ), f"Start device not in payload: {list(devices_by_mfr.keys())}" + assert end_device_id in devices_by_mfr + start_key = ( + start_device_id + if start_device_id in devices_by_mfr + else start_device_id.replace("_A", "") + ) + # Start unit has new location from EdgeTech + assert devices_by_mfr[start_key]["location"]["latitude"] == 40.35775 + assert devices_by_mfr[start_key]["location"]["longitude"] == -70.96013333333333 + # End unit has existing location from ER (not in sync window) + assert devices_by_mfr[end_device_id]["location"]["latitude"] == 40.358 + assert devices_by_mfr[end_device_id]["location"]["longitude"] == -70.959 + assert "not in sync window; using current state from ER" in caplog.text @pytest.mark.asyncio async def test_process_update_validation_error( @@ -773,21 +921,26 @@ async def test_process_update_validation_error( data = [a_new_edgetech_trawl_record] processor = EdgeTechProcessor(data=data, er_token="token", er_url="url") - # Create existing ER gear for update scenario with different location + # ER gear: last_deployed at or after EdgeTech dateDeployed so we get update path + # (not re-deploy). Fixture has dateDeployed 2025-05-25 17:53:19.517 + er_deployed = datetime(2025, 5, 25, 17, 53, 20, tzinfo=timezone.utc) + er_updated = datetime(2025, 5, 25, 17, 53, 19, tzinfo=timezone.utc) mock_device = BuoyDevice( device_id="8899CEDAAA_n9JpP3kk8vFVyNlzMnYZig9DnO475ztWV5JQ4z3RHwO19GPjN9sL8qDw8YgW_A", mfr_device_id="8899CEDAAA_n9JpP3kk8vFVyNlzMnYZig9DnO475ztWV5JQ4z3RHwO19GPjN9sL8qDw8YgW_A", label="Test Device", - location=DeviceLocation(latitude=44.0, longitude=-68.0), # Different location - last_updated=datetime(2025, 5, 20, 10, 0, 0, tzinfo=timezone.utc), - last_deployed=datetime(2025, 5, 20, 10, 0, 0, tzinfo=timezone.utc), + location=DeviceLocation( + latitude=44.0, longitude=-68.0 + ), # Different location to trigger update + last_updated=er_updated, + last_deployed=er_deployed, ) mock_gear = BuoyGear( id=uuid4(), display_id="GEAR123", status="deployed", - last_updated=datetime(2025, 5, 20, 10, 0, 0, tzinfo=timezone.utc), + last_updated=er_updated, devices=[mock_device], type="ropeless", manufacturer="edgetech", @@ -796,18 +949,23 @@ async def test_process_update_validation_error( mock_er_client = mocker.MagicMock() mock_er_client.get_er_gears = AsyncMock(return_value=[mock_gear]) mock_er_client.get_sources = AsyncMock(return_value=[]) - mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock(return_value=None) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) processor._er_client = mock_er_client # Mock _create_gear_payload to raise ValidationError (async mock) mock_create = AsyncMock(side_effect=pydantic.ValidationError([], Buoy)) - mocker.patch.object(processor, '_create_gear_payload', mock_create) + mocker.patch.object(processor, "_create_gear_payload", mock_create) with caplog.at_level(logging.ERROR): payloads = await processor.process() - # Should log the validation error for update - assert "Failed to create gear payload for update" in caplog.text + # Should log the validation error for update or deployment (re-deploy uses deploy path) + assert ( + "Failed to create gear payload for update" in caplog.text + or "Failed to create gear payload for deployment" in caplog.text + ) assert len(payloads) == 0 @pytest.mark.asyncio @@ -818,21 +976,25 @@ async def test_process_update_general_exception( data = [a_new_edgetech_trawl_record] processor = EdgeTechProcessor(data=data, er_token="token", er_url="url") - # Create existing ER gear for update scenario with different location + # ER gear: last_deployed at or after EdgeTech dateDeployed so we get update path + er_deployed = datetime(2025, 5, 25, 17, 53, 20, tzinfo=timezone.utc) + er_updated = datetime(2025, 5, 25, 17, 53, 19, tzinfo=timezone.utc) mock_device = BuoyDevice( device_id="8899CEDAAA_n9JpP3kk8vFVyNlzMnYZig9DnO475ztWV5JQ4z3RHwO19GPjN9sL8qDw8YgW_A", mfr_device_id="8899CEDAAA_n9JpP3kk8vFVyNlzMnYZig9DnO475ztWV5JQ4z3RHwO19GPjN9sL8qDw8YgW_A", label="Test Device", - location=DeviceLocation(latitude=44.0, longitude=-68.0), # Different location - last_updated=datetime(2025, 5, 20, 10, 0, 0, tzinfo=timezone.utc), - last_deployed=datetime(2025, 5, 20, 10, 0, 0, tzinfo=timezone.utc), + location=DeviceLocation( + latitude=44.0, longitude=-68.0 + ), # Different location to trigger update + last_updated=er_updated, + last_deployed=er_deployed, ) mock_gear = BuoyGear( id=uuid4(), display_id="GEAR123", status="deployed", - last_updated=datetime(2025, 5, 20, 10, 0, 0, tzinfo=timezone.utc), + last_updated=er_updated, devices=[mock_device], type="ropeless", manufacturer="edgetech", @@ -841,18 +1003,23 @@ async def test_process_update_general_exception( mock_er_client = mocker.MagicMock() mock_er_client.get_er_gears = AsyncMock(return_value=[mock_gear]) mock_er_client.get_sources = AsyncMock(return_value=[]) - mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock(return_value=None) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) processor._er_client = mock_er_client # Mock _create_gear_payload to raise general Exception (async mock) mock_create = AsyncMock(side_effect=Exception("General error")) - mocker.patch.object(processor, '_create_gear_payload', mock_create) + mocker.patch.object(processor, "_create_gear_payload", mock_create) with caplog.at_level(logging.ERROR): payloads = await processor.process() - # Should log the general exception - assert "Failed to create gear payload for update" in caplog.text + # Should log the general exception (update or deployment path) + assert ( + "Failed to create gear payload for update" in caplog.text + or "Failed to create gear payload for deployment" in caplog.text + ) assert "General error" in caplog.text assert len(payloads) == 0 @@ -867,11 +1034,15 @@ async def test_process_haul_no_er_subject_found(self, mocker, caplog): mock_er_client = mocker.MagicMock() mock_er_client.get_er_gears = AsyncMock(return_value=[]) mock_er_client.get_sources = AsyncMock(return_value=[]) - mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock(return_value=None) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) processor._er_client = mock_er_client # Manually trigger the scenario by modifying the to_haul set - async def mock_identify_buoys(er_gears_devices_id_to_gear, serial_number_to_edgetech_buoy): + async def mock_identify_buoys( + er_gears_devices_id_to_gear, serial_number_to_edgetech_buoy + ): # Return a buoy serial/user that should be hauled but doesn't exist in ER return set(), {"NONEXISTENT/userABC"}, set() @@ -915,7 +1086,9 @@ async def test_process_haul_validation_error(self, mocker, caplog): processor._er_client = mock_er_client # Mock _identify_buoys to return a haul set that will be processed - async def mock_identify_buoys(er_gears_devices_id_to_gear, serial_number_to_edgetech_buoy): + async def mock_identify_buoys( + er_gears_devices_id_to_gear, serial_number_to_edgetech_buoy + ): return set(), {"HAUL123/userABC"}, set() processor._identify_buoys = mock_identify_buoys @@ -923,8 +1096,12 @@ async def mock_identify_buoys(er_gears_devices_id_to_gear, serial_number_to_edge # Mock _create_haul_payload to raise ValidationError def raise_validation_error(*args, **kwargs): raise pydantic.ValidationError([], BuoyGear) - - mocker.patch.object(processor, '_create_haul_payload', new=Mock(side_effect=raise_validation_error)) + + mocker.patch.object( + processor, + "_create_haul_payload", + new=Mock(side_effect=raise_validation_error), + ) with caplog.at_level(logging.ERROR): payloads = await processor.process() @@ -950,11 +1127,15 @@ async def test_process_deploy_skip_end_unit_with_start_unit( # Create end unit record that should be skipped during deploy end_unit_record = a_new_edgetech_trawl_record.copy() end_unit_record["serialNumber"] = "END456" - end_unit_record["userId"] = a_new_edgetech_trawl_record["userId"] # Same user for both + end_unit_record["userId"] = a_new_edgetech_trawl_record[ + "userId" + ] # Same user for both end_unit_record["currentState"] = end_unit_record["currentState"].copy() end_unit_record["currentState"]["serialNumber"] = "END456" end_unit_record["currentState"]["isTwoUnitLine"] = True - end_unit_record["currentState"]["startUnit"] = "START123" # This makes it an end unit that should be skipped + end_unit_record["currentState"][ + "startUnit" + ] = "START123" # This makes it an end unit that should be skipped end_unit_record["currentState"]["endUnit"] = None # Include both records so the end unit can be found @@ -965,7 +1146,9 @@ async def test_process_deploy_skip_end_unit_with_start_unit( mock_er_client = mocker.MagicMock() mock_er_client.get_er_gears = AsyncMock(return_value=[]) mock_er_client.get_sources = AsyncMock(return_value=[]) - mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock(return_value=None) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) processor._er_client = mock_er_client # The end unit should be skipped (line 258), so only start unit observations should be created @@ -977,13 +1160,161 @@ async def test_process_deploy_skip_end_unit_with_start_unit( # Should not create observations for the end unit (it's skipped) @pytest.mark.asyncio - async def test_process_update_no_location_change_exact_coordinates(self, mocker, caplog): + async def test_process_circular_two_unit_line_only_one_gear_created( + self, mocker, caplog, a_new_edgetech_trawl_record + ): + """When two devices are each configured as start with the other as end (circular), + only one gearset payload is created; the duplicate start is skipped with a warning. + """ + user_id = a_new_edgetech_trawl_record["userId"] + # Unit A: lead in one config, endUnit = B + record_a = a_new_edgetech_trawl_record.copy() + record_a["serialNumber"] = "88CE99D99E" + record_a["currentState"] = record_a["currentState"].copy() + record_a["currentState"]["serialNumber"] = "88CE99D99E" + record_a["currentState"]["isTwoUnitLine"] = True + record_a["currentState"]["endUnit"] = "88CE9978B7" + record_a["currentState"]["startUnit"] = None + record_a["currentState"]["endLatDeg"] = None + record_a["currentState"]["endLonDeg"] = None + # Unit B: also configured as lead with endUnit = A (circular) + record_b = a_new_edgetech_trawl_record.copy() + record_b["serialNumber"] = "88CE9978B7" + record_b["userId"] = user_id + record_b["currentState"] = record_b["currentState"].copy() + record_b["currentState"]["serialNumber"] = "88CE9978B7" + record_b["currentState"]["latDeg"] = 42.3267312 + record_b["currentState"]["lonDeg"] = -70.0474376 + record_b["currentState"]["isTwoUnitLine"] = True + record_b["currentState"]["endUnit"] = "88CE99D99E" + record_b["currentState"]["startUnit"] = None + record_b["currentState"]["endLatDeg"] = None + record_b["currentState"]["endLonDeg"] = None + + data = [record_a, record_b] + processor = EdgeTechProcessor(data=data, er_token="token", er_url="url") + + mock_er_client = mocker.MagicMock() + mock_er_client.get_er_gears = AsyncMock(return_value=[]) + mock_er_client.get_sources = AsyncMock(return_value=[]) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) + processor._er_client = mock_er_client + + with caplog.at_level(logging.WARNING): + payloads = await processor.process() + + # Exactly one gear payload (canonical lead); the other start is skipped + assert len(payloads) == 1 + assert "Circular two-unit line detected" in caplog.text + assert "88CE99D99E" in caplog.text and "88CE9978B7" in caplog.text + # Canonical lead is the smaller serial (88CE9978B7 < 88CE99D99E) + gear = payloads[0] + device_mfr_ids = [d.get("mfr_device_id") for d in gear.get("devices", [])] + assert len(device_mfr_ids) == 2 + hashed = get_hashed_user_id(user_id) + assert f"88CE9978B7_{hashed}" in device_mfr_ids + assert f"88CE99D99E_{hashed}" in device_mfr_ids + + @pytest.mark.asyncio + async def test_process_circular_two_unit_both_already_in_er_same_gear_single_update( + self, mocker, caplog, a_new_edgetech_trawl_record + ): + """When both devices are already in ER (e.g. Feb 3 gear that later got the + circular pair attached), circular skip ensures we send only one update for + that gear, not two updates (which could duplicate or confuse the gear). + """ + user_id = a_new_edgetech_trawl_record["userId"] + hashed = get_hashed_user_id(user_id) + gear_id = uuid4() + # Existing gear in ER with both devices (simulating circular attach to Feb 3 gear) + older_time = datetime(2026, 2, 3, 12, 0, 0, tzinfo=timezone.utc) + mock_device_a = BuoyDevice( + device_id=f"88CE99D99E_{hashed}", + mfr_device_id=f"88CE99D99E_{hashed}", + label="Lead", + location=DeviceLocation(latitude=42.32, longitude=-70.05), + last_updated=older_time, + last_deployed=older_time, + ) + mock_device_b = BuoyDevice( + device_id=f"88CE9978B7_{hashed}", + mfr_device_id=f"88CE9978B7_{hashed}", + label="End", + location=DeviceLocation(latitude=42.32, longitude=-70.04), + last_updated=older_time, + last_deployed=older_time, + ) + existing_gear = BuoyGear( + id=gear_id, + display_id="GEAR-FEB3", + status="deployed", + last_updated=older_time, + devices=[mock_device_a, mock_device_b], + type="ropeless", + manufacturer="edgetech", + ) + # EdgeTech: circular two-unit (both as start with other as end) + record_a = a_new_edgetech_trawl_record.copy() + record_a["serialNumber"] = "88CE99D99E" + record_a["currentState"] = record_a["currentState"].copy() + record_a["currentState"]["serialNumber"] = "88CE99D99E" + record_a["currentState"]["latDeg"] = 42.3246478 + record_a["currentState"]["lonDeg"] = -70.0545583 + record_a["currentState"]["lastUpdated"] = "2026-02-14T22:34:54.891Z" + record_a["currentState"]["isTwoUnitLine"] = True + record_a["currentState"]["endUnit"] = "88CE9978B7" + record_a["currentState"]["startUnit"] = None + record_a["currentState"]["endLatDeg"] = None + record_a["currentState"]["endLonDeg"] = None + record_b = a_new_edgetech_trawl_record.copy() + record_b["serialNumber"] = "88CE9978B7" + record_b["userId"] = user_id + record_b["currentState"] = record_b["currentState"].copy() + record_b["currentState"]["serialNumber"] = "88CE9978B7" + record_b["currentState"]["latDeg"] = 42.3267312 + record_b["currentState"]["lonDeg"] = -70.0474376 + record_b["currentState"]["lastUpdated"] = "2026-02-14T22:34:54.975Z" + record_b["currentState"]["isTwoUnitLine"] = True + record_b["currentState"]["endUnit"] = "88CE99D99E" + record_b["currentState"]["startUnit"] = None + record_b["currentState"]["endLatDeg"] = None + record_b["currentState"]["endLonDeg"] = None + + data = [record_a, record_b] + processor = EdgeTechProcessor(data=data, er_token="token", er_url="url") + + mock_er_client = mocker.MagicMock() + mock_er_client.get_er_gears = AsyncMock(return_value=[existing_gear]) + mock_er_client.get_sources = AsyncMock(return_value=[]) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) + processor._er_client = mock_er_client + + with caplog.at_level(logging.WARNING): + payloads = await processor.process() + + # One update payload for the existing gear (no second update from the other start) + assert len(payloads) == 1 + assert payloads[0].get("set_id") == str(gear_id) + assert len(payloads[0].get("devices", [])) == 2 + assert ( + "Skipping update for 88CE99D99E" in caplog.text + or "circular two-unit duplicate" in caplog.text + ) + + @pytest.mark.asyncio + async def test_process_update_no_location_change_exact_coordinates( + self, mocker, caplog + ): """Test that location comparison path is covered when coordinates match exactly.""" # Create the buoy data to pass to the constructor user_id = "n9JpP3kk8vFVyNlzMnYZig9DnO475ztWV5JQ4z3RHwO19GPjN9sL8qDw8YgW" hashed_user_id = get_hashed_user_id(user_id) serial_number = "8899CEDAAA" - + mock_edgetech_buoy_data = { "serialNumber": serial_number, "userId": user_id, @@ -1015,7 +1346,7 @@ async def test_process_update_no_location_change_exact_coordinates(self, mocker, "endLonDeg": -68.167191, "isTwoUnitLine": None, "endUnit": None, - "startUnit": None + "startUnit": None, }, "changeRecords": [ { @@ -1027,16 +1358,20 @@ async def test_process_update_no_location_change_exact_coordinates(self, mocker, "oldValue": None, "newValue": "2025-05-25T17:53:19.517Z", } - ] + ], } - ] + ], } - - processor = EdgeTechProcessor(data=[mock_edgetech_buoy_data], er_token="test_token", er_url="http://test.com") - + + processor = EdgeTechProcessor( + data=[mock_edgetech_buoy_data], + er_token="test_token", + er_url="http://test.com", + ) + # Use the exact device_id that will be generated by the processor expected_device_id_primary = f"{serial_number}_{hashed_user_id}_A" - + # Create a mock device with location as a tuple (to work around the processor bug) mock_device = Mock() mock_device.device_id = expected_device_id_primary @@ -1048,14 +1383,21 @@ async def test_process_update_no_location_change_exact_coordinates(self, mocker, mock_gear = Mock() mock_gear.devices = [mock_device] mock_gear.manufacturer = "edgetech" # This is important for the filtering - mock_gear.last_updated = datetime(2025, 5, 20, 10, 0, 0, tzinfo=timezone.utc) # Older than the buoy's lastUpdated - mock_gear.create_haul_observation = Mock(return_value=[]) # Return empty list to avoid the TypeError + mock_gear.status = "deployed" + mock_gear.last_updated = datetime( + 2025, 5, 20, 10, 0, 0, tzinfo=timezone.utc + ) # Older than the buoy's lastUpdated + mock_gear.create_haul_observation = Mock( + return_value=[] + ) # Return empty list to avoid the TypeError # Mock the ER client to return the gear mock_er_client = mocker.MagicMock() mock_er_client.get_er_gears = AsyncMock(return_value=[mock_gear]) mock_er_client.get_sources = AsyncMock(return_value=[]) - mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock(return_value=None) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) processor._er_client = mock_er_client with caplog.at_level(logging.INFO): @@ -1070,7 +1412,7 @@ async def test_identify_buoys_position_update_not_skipped_same_date_deployed( ): """ Test that position updates are NOT skipped when dateDeployed is unchanged but lastUpdated is newer. - + This is the bug fix test: Previously, position updates were incorrectly skipped because the code used dateDeployed for recorded_at, and dateDeployed doesn't change on position updates. Now we use lastUpdated for updates, so position changes get a new recorded_at timestamp. @@ -1118,7 +1460,9 @@ async def test_identify_buoys_position_update_not_skipped_same_date_deployed( latitude=40.3499686, longitude=-71.573436 # OLD position ), last_updated=datetime(2026, 1, 15, 22, 40, 9, tzinfo=timezone.utc), - last_deployed=datetime(2026, 1, 15, 22, 40, 8, tzinfo=timezone.utc), # SAME as EdgeTech + last_deployed=datetime( + 2026, 1, 15, 22, 40, 8, tzinfo=timezone.utc + ), # SAME as EdgeTech ) mock_gear = BuoyGear( @@ -1156,9 +1500,9 @@ async def test_identify_buoys_position_update_not_skipped_same_date_deployed( @pytest.mark.asyncio async def test_create_gear_payload_uses_last_updated_for_updates(self): """ - Test that _create_gear_payload uses lastUpdated (not dateDeployed) for recorded_at + Test that _create_gear_payload uses lastUpdated (not dateDeployed) for recorded_at when include_initial_deployment=False (i.e., for updates). - + This ensures position updates get a unique recorded_at timestamp. """ processor = EdgeTechProcessor(data=[], er_token="token", er_url="url") @@ -1179,7 +1523,7 @@ async def test_create_gear_payload_uses_last_updated_for_updates(self): "modelNumber": "Model123", "isDeployed": True, "dateDeployed": "2026-01-15T22:40:08.000Z", # Deployment time - "lastUpdated": "2026-01-15T22:42:49.000Z", # Position update time (later) + "lastUpdated": "2026-01-15T22:42:49.000Z", # Position update time (later) }, "changeRecords": [], } @@ -1195,7 +1539,9 @@ async def test_create_gear_payload_uses_last_updated_for_updates(self): ) # For initial deployment, recorded_at should be dateDeployed - assert payload_initial["devices"][0]["recorded_at"] == "2026-01-15T22:40:08+00:00" + assert ( + payload_initial["devices"][0]["recorded_at"] == "2026-01-15T22:40:08+00:00" + ) # Test for UPDATE (include_initial_deployment=False) payload_update = await processor._create_gear_payload( @@ -1206,16 +1552,21 @@ async def test_create_gear_payload_uses_last_updated_for_updates(self): ) # For updates, recorded_at should be lastUpdated (the fix!) - assert payload_update["devices"][0]["recorded_at"] == "2026-01-15T22:42:49+00:00" + assert ( + payload_update["devices"][0]["recorded_at"] == "2026-01-15T22:42:49+00:00" + ) # Verify the timestamps are different - assert payload_initial["devices"][0]["recorded_at"] != payload_update["devices"][0]["recorded_at"] + assert ( + payload_initial["devices"][0]["recorded_at"] + != payload_update["devices"][0]["recorded_at"] + ) @pytest.mark.asyncio async def test_position_update_end_to_end(self, mocker, caplog): """ End-to-end test that verifies position updates are processed correctly. - + Scenario: A deployed gear has its position updated in EdgeTech, but dateDeployed remains unchanged. The update should be processed and create a payload with the new position and lastUpdated as recorded_at. @@ -1237,7 +1588,7 @@ async def test_position_update_end_to_end(self, mocker, caplog): "modelNumber": "Model123", "isDeployed": True, "dateDeployed": "2026-01-15T10:00:00.000Z", # SAME as ER - "lastUpdated": "2026-01-15T12:00:00.000Z", # NEWER + "lastUpdated": "2026-01-15T12:00:00.000Z", # NEWER }, "changeRecords": [], } @@ -1328,11 +1679,11 @@ async def test_process_deploy_end_unit_record_skip_line_258(self, mocker, caplog "endLonDeg": -68.16757, "isTwoUnitLine": False, "endUnit": None, - "startUnit": None + "startUnit": None, }, - "changeRecords": [] + "changeRecords": [], } - + # Create end unit record that should trigger line 258 skip # This record has both endUnit (so partner can be found) AND startUnit (so it's skipped) end_unit_data = { @@ -1366,19 +1717,23 @@ async def test_process_deploy_end_unit_record_skip_line_258(self, mocker, caplog "endLonDeg": -68.16757, "isTwoUnitLine": True, "endUnit": "COMPANION789", # This allows the end unit to be found (avoids line 255) - "startUnit": "START123" # This triggers line 258 skip! + "startUnit": "START123", # This triggers line 258 skip! }, - "changeRecords": [] + "changeRecords": [], } - + # Process both records - so the companion can be found in the data - processor = EdgeTechProcessor(data=[companion_data, end_unit_data], er_token="token", er_url="url") + processor = EdgeTechProcessor( + data=[companion_data, end_unit_data], er_token="token", er_url="url" + ) # Mock ER client to return no existing gears (deploy scenario) mock_er_client = mocker.MagicMock() mock_er_client.get_er_gears = AsyncMock(return_value=[]) mock_er_client.get_sources = AsyncMock(return_value=[]) - mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock(return_value=None) + mock_er_client.get_existing_source_id_by_manufacturer_id = AsyncMock( + return_value=None + ) processor._er_client = mock_er_client payloads = await processor.process() @@ -1388,3 +1743,241 @@ async def test_process_deploy_end_unit_record_skip_line_258(self, mocker, caplog assert len(payloads) == 1 assert payloads[0]["deployment_type"] == "trawl" assert len(payloads[0]["devices"]) == 2 + + @pytest.mark.asyncio + async def test_identify_buoys_hauled_in_er_deployed_in_edgetech( + self, mocker, a_new_edgetech_trawl_record + ): + """When ER gear is hauled but EdgeTech shows deployed, treat as new deployment.""" + processor = EdgeTechProcessor( + data=[a_new_edgetech_trawl_record], er_token="token", er_url="url" + ) + + user_id = a_new_edgetech_trawl_record["userId"] + serial_number = a_new_edgetech_trawl_record["serialNumber"] + hashed_user_id = get_hashed_user_id(user_id) + device_id = f"{serial_number}_{hashed_user_id}_A" + + mock_device = BuoyDevice( + device_id="some-uuid", + mfr_device_id=device_id, + label="Device A", + location=DeviceLocation(latitude=44.0, longitude=-68.0), + last_updated=datetime.now(timezone.utc), + last_deployed=datetime(2025, 5, 20, 10, 0, 0, tzinfo=timezone.utc), + ) + + mock_gear = BuoyGear( + id=uuid4(), + display_id="GEAR123", + status="hauled", # Already hauled in ER + last_updated=datetime.now(timezone.utc) - timedelta(hours=1), + devices=[mock_device], + type="trawl", + manufacturer="edgetech", + ) + + er_gears_devices_id_to_gear = {device_id: mock_gear} + buoy_key = f"{serial_number}/{hashed_user_id}" + serial_number_to_edgetech_buoy = {buoy_key: processor._data[0]} + + to_deploy, to_haul, to_update = await processor._identify_buoys( + er_gears_devices_id_to_gear, serial_number_to_edgetech_buoy + ) + + assert buoy_key in to_deploy + assert len(to_haul) == 0 + assert len(to_update) == 0 + + @pytest.mark.asyncio + async def test_redeployment_haul_uses_change_record_date_recovered(self): + """For re-deployments, haul payload should use dateRecovered from changeRecords + when currentState dateRecovered is null (cleared by the redeploy).""" + user_id = "66fff43f7386585d6687e3d3" + serial_number = "88CE99D39E" + hashed_user_id = get_hashed_user_id(user_id) + + # Simulate the re-deployment scenario: buoy hauled then redeployed within seconds + buoy_data = { + "serialNumber": serial_number, + "userId": user_id, + "currentState": { + "etag": "1773845753701", + "isDeleted": False, + "serialNumber": serial_number, + "releaseCommand": "C8AB8C769E", + "statusCommand": serial_number, + "idCommand": "CCCCCCCCCC", + "latDeg": 43.8309358, + "lonDeg": -69.6456942, + "endLatDeg": 43.8315133, + "endLonDeg": -69.6456975, + "modelNumber": "5112", + "isDeployed": True, + "dateDeployed": "2026-03-18T14:55:53.191Z", + "dateRecovered": None, # Cleared by redeploy + "recoveredLatDeg": None, # Cleared by redeploy + "recoveredLonDeg": None, + "lastUpdated": "2026-03-18T14:55:53.701Z", + }, + "changeRecords": [ + { + "type": "MODIFY", + "timestamp": "2026-03-18T14:55:53.000Z", + "changes": [ + { + "key": "dateDeployed", + "oldValue": None, + "newValue": "2026-03-18T14:55:53.191Z", + }, + { + "key": "dateRecovered", + "oldValue": "2026-03-18T14:48:26.078Z", + "newValue": None, + }, + {"key": "isDeployed", "oldValue": False, "newValue": True}, + { + "key": "recoveredLatDeg", + "oldValue": 43.8495062, + "newValue": None, + }, + { + "key": "recoveredLonDeg", + "oldValue": -69.6290056, + "newValue": None, + }, + ], + }, + { + "type": "MODIFY", + "timestamp": "2026-03-18T14:55:07.000Z", + "changes": [ + { + "key": "dateDeployed", + "oldValue": "2026-03-10T11:16:57.467Z", + "newValue": None, + }, + { + "key": "dateRecovered", + "oldValue": None, + "newValue": "2026-03-18T14:48:26.078Z", + }, + {"key": "isDeployed", "oldValue": True, "newValue": False}, + { + "key": "recoveredLatDeg", + "oldValue": None, + "newValue": 43.8495062, + }, + { + "key": "recoveredLonDeg", + "oldValue": None, + "newValue": -69.6290056, + }, + { + "key": "recoveredRangeM", + "oldValue": None, + "newValue": 50.932, + }, + ], + }, + ], + } + + buoy = Buoy.parse_obj(buoy_data) + processor = EdgeTechProcessor(data=[buoy_data], er_token="token", er_url="url") + + # Create the ER gear that was deployed earlier (March 10) + device_id_a = f"{serial_number}_{hashed_user_id}_A" + mock_device = BuoyDevice( + device_id="existing-uuid", + mfr_device_id=device_id_a, + label="Device A", + location=DeviceLocation(latitude=43.8308732, longitude=-69.6453918), + last_updated=datetime(2026, 3, 10, 11, 16, 57, tzinfo=timezone.utc), + last_deployed=datetime(2026, 3, 10, 11, 16, 57, tzinfo=timezone.utc), + ) + mock_gear = BuoyGear( + id=uuid4(), + display_id="GEAR-OLD", + status="deployed", + last_updated=datetime(2026, 3, 10, 11, 16, 57, tzinfo=timezone.utc), + devices=[mock_device], + type="single", + manufacturer="edgetech", + ) + + # Test _create_haul_payload uses dateRecovered from changeRecords + payload = processor._create_haul_payload(er_gear=mock_gear, edgetech_buoy=buoy) + + # The haul recorded_at should be from the changeRecords dateRecovered (14:48:26) + # NOT from lastUpdated (14:55:53) which would collide with the deploy + assert payload["devices"][0]["recorded_at"] == "2026-03-18T14:48:26+00:00" + + # Recovery location should also come from changeRecords + assert payload["devices"][0]["location"]["latitude"] == 43.8495062 + assert payload["devices"][0]["location"]["longitude"] == -69.6290056 + + @pytest.mark.asyncio + async def test_redeployment_identify_haul_and_deploy(self): + """Re-deployment: same serial hauled and redeployed within seconds should + produce both a haul and a deploy.""" + user_id = "66fff43f7386585d6687e3d3" + serial_number = "88CE99D39E" + hashed_user_id = get_hashed_user_id(user_id) + + buoy_data = { + "serialNumber": serial_number, + "userId": user_id, + "currentState": { + "etag": "1773845753701", + "isDeleted": False, + "serialNumber": serial_number, + "releaseCommand": "C8AB8C769E", + "statusCommand": serial_number, + "idCommand": "CCCCCCCCCC", + "latDeg": 43.8309358, + "lonDeg": -69.6456942, + "endLatDeg": 43.8315133, + "endLonDeg": -69.6456975, + "modelNumber": "5112", + "isDeployed": True, + "dateDeployed": "2026-03-18T14:55:53.191Z", + "dateRecovered": None, + "lastUpdated": "2026-03-18T14:55:53.701Z", + }, + "changeRecords": [], + } + + processor = EdgeTechProcessor(data=[buoy_data], er_token="token", er_url="url") + + device_id_a = f"{serial_number}_{hashed_user_id}_A" + mock_device = BuoyDevice( + device_id="existing-uuid", + mfr_device_id=device_id_a, + label="Device A", + location=DeviceLocation(latitude=43.8308732, longitude=-69.6453918), + last_updated=datetime(2026, 3, 10, 11, 16, 57, tzinfo=timezone.utc), + last_deployed=datetime(2026, 3, 10, 11, 16, 57, tzinfo=timezone.utc), + ) + mock_gear = BuoyGear( + id=uuid4(), + display_id="GEAR-OLD", + status="deployed", + last_updated=datetime(2026, 3, 10, 11, 16, 57, tzinfo=timezone.utc), + devices=[mock_device], + type="single", + manufacturer="edgetech", + ) + + er_gears_devices_id_to_gear = {device_id_a: mock_gear} + buoy_key = f"{serial_number}/{hashed_user_id}" + serial_number_to_edgetech_buoy = {buoy_key: processor._data[0]} + + to_deploy, to_haul, to_update = await processor._identify_buoys( + er_gears_devices_id_to_gear, serial_number_to_edgetech_buoy + ) + + # Should be in both haul and deploy (re-deployment) + assert buoy_key in to_haul + assert buoy_key in to_deploy + assert len(to_update) == 0