Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,9 @@ After filtering, we compare EdgeTech data with our existing Earth Ranger records
- Action: **Haul** the existing gear (close it), then **Deploy** a new gear set with a new `set_id`
- **Processing order**: Haul payload is sent first, then the new deployment payload, so the previous gear is closed before the new one is created
- **Why**: When the same serial(s) are deployed again (e.g. same two-unit pair with a new `dateDeployed`), we close the previous deployment in ER/Buoy and create a new one instead of updating the old gear in place
- **Haul timestamp for re-deployments**: When a buoy is hauled and redeployed within seconds, the `currentState.dateRecovered` is cleared by the redeploy. In this case, the haul payload’s `recorded_at` is sourced from the most recent `dateRecovered` in `changeRecords` to avoid colliding with the deploy’s `recorded_at` (which uses `dateDeployed`). Recovery location (`recoveredLatDeg`/`recoveredLonDeg`) is also recovered from `changeRecords` in the same way.
- **Haul timestamp for re-deployments**: The haul payload’s `recorded_at` must not collide with the deploy payload’s `recorded_at` (which uses `dateDeployed`), since both share the same `device_id` and ER enforces a `(device_id, recorded_at)` unique constraint. Timestamps are truncated to seconds (`_remove_milliseconds`), so values within the same second will collide.
- **With `dateRecovered` in `changeRecords`**: When a buoy is hauled and redeployed within seconds, `currentState.dateRecovered` is cleared by the redeploy. The haul `recorded_at` is sourced from the most recent `dateRecovered` in `changeRecords`. Recovery location (`recoveredLatDeg`/`recoveredLonDeg`) is also recovered from `changeRecords`.
- **Without `dateRecovered` (skipped haul)**: EdgeTech may skip the haul stage entirely — the gear goes straight from one deployment to another with no `dateRecovered` anywhere. In this case, `lastUpdated` and `dateDeployed` are typically within the same second (both reflect the new deployment), so the haul uses `dateDeployed - 1 second` as a deterministic `recorded_at` that is guaranteed not to collide with the deploy payload.

**2a. DEPLOY (Recovery from missed deployment)**
- Buoy exists in **both** EdgeTech and Earth Ranger, but ER gear `status` is **not** `"deployed"` (e.g. `"hauled"`)
Expand Down Expand Up @@ -1134,7 +1136,7 @@ This integration provides robust synchronization between EdgeTech's Trap Tracker
✅ **Efficient database dump** mechanism for bulk data retrieval
✅ **Intelligent filtering** to process active and explicitly hauled buoys
✅ **Explicit status-based haul detection** (not inferred from absence)
✅ **Re-deployment handling**: when EdgeTech `dateDeployed` is meaningfully later than ER’s deployment, we close the previous gear and create a new one (hauls sent before new deployments). Haul timestamps and recovery locations are sourced from `changeRecords` when a rapid haul+redeploy clears `currentState`.
✅ **Re-deployment handling**: when EdgeTech `dateDeployed` is meaningfully later than ER’s deployment, we close the previous gear and create a new one (hauls sent before new deployments). Haul timestamps and recovery locations are sourced from `changeRecords` when a rapid haul+redeploy clears `currentState`. When EdgeTech skips the haul stage entirely (no `dateRecovered` anywhere), the synthetic haul uses `dateDeployed - 1s` as a deterministic `recorded_at` to avoid collisions with the deploy payload.
✅ **Recovery from missed deployments**: if a re-deployment haul succeeded but the deploy failed, subsequent runs detect the hauled ER gear vs deployed EdgeTech state and create the missing deployment
✅ **Set ID resolution** to correctly update existing vs create new gear sets
✅ **Support for complex systems** including two-unit lines
Expand Down
34 changes: 25 additions & 9 deletions app/actions/edgetech/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,10 @@ async def _create_gear_payload(
return payload

def _create_haul_payload(
self, er_gear: BuoyGear, edgetech_buoy: Optional[Buoy] = None
self,
er_gear: BuoyGear,
edgetech_buoy: Optional[Buoy] = None,
is_redeployment: bool = False,
) -> Dict[str, Any]:
"""
Create a haul payload from an existing ER gear.
Expand All @@ -242,6 +245,11 @@ def _create_haul_payload(
Args:
er_gear: The existing gear from ER
edgetech_buoy: Optional EdgeTech buoy data with potential recovery location
is_redeployment: Whether this haul is part of a re-deployment sequence.
When True and no dateRecovered is found, uses dateDeployed - 1s
as a deterministic recorded_at instead of falling back to
lastUpdated, which could collide with the subsequent deploy
payload's recorded_at after millisecond truncation.

Returns:
Dict in the format expected by /api/v1/gear/ POST endpoint
Expand All @@ -254,10 +262,9 @@ def _create_haul_payload(
recovery_lon = None

# Determine the recorded_at timestamp for the haul event
# Use dateRecovered if available; for re-deployments (haul then immediate
# redeploy) the currentState dateRecovered is cleared, so check changeRecords
# for the most recent dateRecovered value. Fall back to lastUpdated.
haul_recorded_at = datetime.now(timezone.utc)
# Priority: dateRecovered from currentState → dateRecovered from changeRecords
# → dateDeployed - 1s (re-deployments without dateRecovered) → lastUpdated.
haul_recorded_at = self._utcnow()
if edgetech_buoy:
if edgetech_buoy.currentState.dateRecovered:
haul_recorded_at = edgetech_buoy.currentState.dateRecovered
Expand All @@ -274,6 +281,15 @@ def _create_haul_payload(
f"Using dateRecovered from changeRecords for haul of "
f"{edgetech_buoy.serialNumber}: {recovered_at_from_changes}"
)
elif is_redeployment and edgetech_buoy.currentState.dateDeployed:
# For re-deployments without dateRecovered, lastUpdated and
# dateDeployed are typically in the same second (both reflect
# the new deployment). Use dateDeployed - 1s as a deterministic
# haul timestamp that is guaranteed not to collide with the
# deploy payload's recorded_at after millisecond truncation.
haul_recorded_at = (
edgetech_buoy.currentState.dateDeployed - timedelta(seconds=1)
)
elif edgetech_buoy.currentState.lastUpdated:
haul_recorded_at = edgetech_buoy.currentState.lastUpdated

Expand Down Expand Up @@ -323,9 +339,7 @@ def _create_haul_payload(
if device.last_deployed
else device.last_updated.isoformat()
),
"last_updated": self._remove_milliseconds(
datetime.now(timezone.utc)
).isoformat(),
"last_updated": self._remove_milliseconds(self._utcnow()).isoformat(),
"recorded_at": self._remove_milliseconds(haul_recorded_at).isoformat(),
"device_status": "hauled",
"location": {
Expand Down Expand Up @@ -847,7 +861,9 @@ async def process(self) -> List[Dict[str, Any]]:

try:
payload = self._create_haul_payload(
er_gear=er_gear, edgetech_buoy=edgetech_buoy
er_gear=er_gear,
edgetech_buoy=edgetech_buoy,
is_redeployment=serial_number_user_id in to_deploy,
)
gear_payloads.append(payload)
haul_gears_processed.add(er_gear.display_id)
Expand Down
103 changes: 103 additions & 0 deletions app/actions/tests/test_edgetech_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1994,3 +1994,106 @@ async def test_redeployment_identify_haul_and_deploy(self):
assert buoy_key in to_haul
assert buoy_key in to_deploy
assert len(to_update) == 0

def test_redeployment_haul_without_date_recovered_avoids_recorded_at_collision(
self,
):
"""When EdgeTech skips the haul stage (no dateRecovered anywhere) and
dateDeployed/lastUpdated fall in the same second, the haul payload must
NOT reuse lastUpdated as recorded_at — that would collide with the
deploy payload's recorded_at after millisecond truncation."""
user_id = "684b1ec1c1df05abfa78b756"
serial_number = "88CE99D98B"
hashed_user_id = get_hashed_user_id(user_id)

# Real-world data: EdgeTech skipped haul, went straight to new deployment.
# dateDeployed and lastUpdated are in the same second.
buoy_data = {
"serialNumber": serial_number,
"userId": user_id,
"currentState": {
"etag": "1774708500700",
"isDeleted": False,
"serialNumber": serial_number,
"releaseCommand": "C8AB8CDC8B",
"statusCommand": serial_number,
"idCommand": "CCCCCCCCCC",
"latDeg": 42.4224917,
"lonDeg": -70.6649972,
"endLatDeg": 42.4268093,
"endLonDeg": -70.6696386,
"modelNumber": "5112",
"isDeployed": True,
"dateDeployed": "2026-03-28T14:35:00.167Z",
"lastUpdated": "2026-03-28T14:35:00.700Z",
},
"changeRecords": [
{
"type": "MODIFY",
"timestamp": "2026-03-28T14:35:00.000Z",
"changes": [
{
"key": "dateDeployed",
"oldValue": "2026-03-05T15:15:45.573Z",
"newValue": "2026-03-28T14:35:00.167Z",
},
{
"key": "latDeg",
"oldValue": 42.4275164,
"newValue": 42.4224917,
},
{
"key": "lonDeg",
"oldValue": -70.6701861,
"newValue": -70.6649972,
},
],
}
],
}

buoy = Buoy.parse_obj(buoy_data)
processor = EdgeTechProcessor(data=[buoy_data], er_token="token", er_url="url")

device_id_a = f"{serial_number}_{hashed_user_id}_A"
mock_device = BuoyDevice(
device_id="existing-uuid",
mfr_device_id=device_id_a,
label="Device A",
location=DeviceLocation(latitude=42.4275164, longitude=-70.6701861),
last_updated=datetime(2026, 3, 5, 15, 15, 45, tzinfo=timezone.utc),
last_deployed=datetime(2026, 3, 5, 15, 15, 45, tzinfo=timezone.utc),
)
mock_gear = BuoyGear(
id=uuid4(),
display_id="GEAR-OLD",
status="deployed",
last_updated=datetime(2026, 3, 5, 15, 15, 45, tzinfo=timezone.utc),
devices=[mock_device],
type="single",
manufacturer="edgetech",
)

# Haul payload for re-deployment (is_redeployment=True)
haul_payload = processor._create_haul_payload(
er_gear=mock_gear, edgetech_buoy=buoy, is_redeployment=True
)

# The haul recorded_at should be dateDeployed - 1s (deterministic, no
# collision with the deploy payload's recorded_at after truncation).
haul_recorded_at = haul_payload["devices"][0]["recorded_at"]
deploy_recorded_at = processor._remove_milliseconds(
buoy.currentState.dateDeployed
).isoformat()
expected_haul_recorded_at = processor._remove_milliseconds(
buoy.currentState.dateDeployed - timedelta(seconds=1)
).isoformat()

assert haul_recorded_at == expected_haul_recorded_at, (
f"Haul recorded_at ({haul_recorded_at}) must be dateDeployed - 1s "
f"({expected_haul_recorded_at})"
)
assert haul_recorded_at != deploy_recorded_at, (
f"Haul recorded_at ({haul_recorded_at}) must differ from deploy "
f"recorded_at ({deploy_recorded_at}) to avoid ER unique constraint collision"
)
Loading