Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions erclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,39 @@ def get_sources(self, page_size=100):
else:
break

# ---- GPX upload ----

def upload_gpx(self, source_id, filepath=None, file=None):
"""
Upload a GPX file for a source. Returns an async task reference.

:param source_id: The source UUID
:param filepath: Path to a .gpx file on disk (mutually exclusive with file)
:param file: An already-opened file-like object (mutually exclusive with filepath)
:return: Task data dict (contains task_id for status polling)
"""
self.logger.debug(f'Uploading GPX for source {source_id}')
path = f'source/{source_id}/gpxdata'
if file:
files = {'gpx_file': file}
return self._post_form(path, files=files)
elif filepath:
with open(filepath, 'rb') as f:
files = {'gpx_file': f}
return self._post_form(path, files=files)
Comment on lines +1069 to +1073
else:
raise ValueError('Either filepath or file must be provided')
Comment on lines +1056 to +1075

def get_gpx_upload_status(self, source_id, task_id):
"""
Check the status of a GPX upload task.

:param source_id: The source UUID
:param task_id: The task ID returned from upload_gpx
:return: Task status data
"""
return self._get(f'source/{source_id}/gpxdata/status/{task_id}')

def get_users(self):
return self._get('users')

Expand Down Expand Up @@ -1613,6 +1646,39 @@ async def get_feature_group(self, feature_group_id: str):
"""
return await self._get(f"spatialfeaturegroup/{feature_group_id}", params={})

# ---- GPX upload ----

async def upload_gpx(self, source_id, filepath=None, file=None):
"""
Upload a GPX file for a source. Returns an async task reference.

:param source_id: The source UUID
:param filepath: Path to a .gpx file on disk (mutually exclusive with file)
:param file: An already-opened file-like object (mutually exclusive with filepath)
:return: Task data dict (contains task_id for status polling)
"""
self.logger.debug(f'Uploading GPX for source {source_id}')
path = f'source/{source_id}/gpxdata'
if file:
files = {'gpx_file': file}
return await self._post_form(path, files=files)
elif filepath:
with open(filepath, 'rb') as f:
files = {'gpx_file': f}
return await self._post_form(path, files=files)
else:
raise ValueError('Either filepath or file must be provided')
Comment on lines +1651 to +1670

async def get_gpx_upload_status(self, source_id, task_id):
"""
Check the status of a GPX upload task.

:param source_id: The source UUID
:param task_id: The task ID returned from upload_gpx
:return: Task status data
"""
return await self._get(f'source/{source_id}/gpxdata/status/{task_id}')

async def _get_data(self, endpoint, params, batch_size=0):
if "page" not in params: # Use cursor paginator unless the user has specified a page
params["use_cursor"] = "true"
Expand Down
294 changes: 294 additions & 0 deletions tests/async_client/test_gpx_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
import io
import json
import re

import httpx
import pytest
import respx

Comment on lines +1 to +8
from erclient import (
ERClientException,
ERClientNotFound,
ERClientPermissionDenied,
ERClientServiceUnreachable,
)


SOURCE_ID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
TASK_ID = "f9e8d7c6-b5a4-3210-fedc-ba9876543210"


@pytest.fixture
def gpx_file_content():
return b"""<?xml version="1.0" encoding="UTF-8"?>
<gpx version="1.1" creator="test">
<trk>
<name>Test Track</name>
<trkseg>
<trkpt lat="-1.293" lon="36.792">
<time>2026-01-15T10:00:00Z</time>
</trkpt>
<trkpt lat="-1.294" lon="36.793">
<time>2026-01-15T10:01:00Z</time>
</trkpt>
</trkseg>
</trk>
</gpx>"""


@pytest.fixture
def gpx_file(gpx_file_content):
return io.BytesIO(gpx_file_content)


@pytest.fixture
def gpx_upload_response():
return {
"data": {
"source_id": SOURCE_ID,
"filename": "track.gpx",
"filesize_bytes": 512,
"process_status": {
"task_info": None,
"task_id": TASK_ID,
"task_success": False,
"task_failed": False,
"task_url": f"https://fake-site.erdomain.org/api/v1.0/source/{SOURCE_ID}/gpxdata/status/{TASK_ID}",
},
},
"status": {"code": 201, "message": "Created"},
}


@pytest.fixture
def gpx_status_pending_response():
return {
"data": {
"task_result": None,
"task_status": "Pending",
"task_success": False,
"task_failed": False,
},
"status": {"code": 200, "message": "OK"},
}


@pytest.fixture
def gpx_status_success_response():
return {
"data": {
"task_result": {"observations_created": 42},
"task_status": "Success",
"task_success": True,
"task_failed": False,
},
"status": {"code": 200, "message": "OK"},
}


# ---- upload_gpx tests ----


@pytest.mark.asyncio
async def test_upload_gpx_with_file_object_success(er_client, gpx_file, gpx_upload_response):
async with respx.mock(
base_url=er_client._api_root("v1.0"), assert_all_called=False
) as respx_mock:
route = respx_mock.post(f"source/{SOURCE_ID}/gpxdata")
route.return_value = httpx.Response(
httpx.codes.CREATED, json=gpx_upload_response
)
response = await er_client.upload_gpx(source_id=SOURCE_ID, file=gpx_file)
assert route.called
assert response == gpx_upload_response["data"]
assert response["process_status"]["task_id"] == TASK_ID
await er_client.close()


@pytest.mark.asyncio
async def test_upload_gpx_with_filepath_success(er_client, gpx_file_content, gpx_upload_response, tmp_path):
gpx_path = tmp_path / "test_track.gpx"
gpx_path.write_bytes(gpx_file_content)

async with respx.mock(
base_url=er_client._api_root("v1.0"), assert_all_called=False
) as respx_mock:
route = respx_mock.post(f"source/{SOURCE_ID}/gpxdata")
route.return_value = httpx.Response(
httpx.codes.CREATED, json=gpx_upload_response
)
response = await er_client.upload_gpx(source_id=SOURCE_ID, filepath=str(gpx_path))
assert route.called
assert response == gpx_upload_response["data"]
await er_client.close()


@pytest.mark.asyncio
async def test_upload_gpx_no_file_raises_error(er_client):
with pytest.raises(ValueError, match="Either filepath or file must be provided"):
await er_client.upload_gpx(source_id=SOURCE_ID)
await er_client.close()


@pytest.mark.asyncio
async def test_upload_gpx_not_found(er_client, gpx_file):
async with respx.mock(
base_url=er_client._api_root("v1.0"), assert_all_called=False
) as respx_mock:
route = respx_mock.post(f"source/{SOURCE_ID}/gpxdata")
route.return_value = httpx.Response(
httpx.codes.NOT_FOUND,
json={"status": {"code": 404, "detail": "source not found"}},
)
with pytest.raises(ERClientNotFound):
await er_client.upload_gpx(source_id=SOURCE_ID, file=gpx_file)
assert route.called
await er_client.close()


@pytest.mark.asyncio
async def test_upload_gpx_forbidden(er_client, gpx_file):
async with respx.mock(
base_url=er_client._api_root("v1.0"), assert_all_called=False
) as respx_mock:
path = f"source/{SOURCE_ID}/gpxdata"
route = respx_mock.post(path)
forbidden_response = {
"data": [],
"status": {
"code": 403,
"message": "Forbidden",
"detail": "You do not have permission to perform this action.",
},
}
route.return_value = httpx.Response(
httpx.codes.FORBIDDEN, json=forbidden_response
)
with pytest.raises(ERClientPermissionDenied) as exc_info:
await er_client.upload_gpx(source_id=SOURCE_ID, file=gpx_file)
assert exc_info.value.status_code == httpx.codes.FORBIDDEN
assert route.called
await er_client.close()


@pytest.mark.asyncio
async def test_upload_gpx_timeout(er_client, gpx_file):
async with respx.mock(
base_url=er_client._api_root("v1.0"), assert_all_called=False
) as respx_mock:
route = respx_mock.post(f"source/{SOURCE_ID}/gpxdata")
route.side_effect = httpx.ConnectTimeout
with pytest.raises(ERClientException):
await er_client.upload_gpx(source_id=SOURCE_ID, file=gpx_file)
assert route.called
await er_client.close()


@pytest.mark.asyncio
async def test_upload_gpx_gateway_timeout(er_client, gpx_file):
async with respx.mock(
base_url=er_client._api_root("v1.0"), assert_all_called=False
) as respx_mock:
path = f"source/{SOURCE_ID}/gpxdata"
route = respx_mock.post(path)
route.return_value = httpx.Response(
httpx.codes.GATEWAY_TIMEOUT, json={}
)
with pytest.raises(ERClientServiceUnreachable) as exc_info:
await er_client.upload_gpx(source_id=SOURCE_ID, file=gpx_file)
assert "Gateway Timeout" in str(exc_info.value)
assert "POST" in str(exc_info.value)
assert "gpxdata" in str(exc_info.value)
assert route.called
await er_client.close()


# ---- get_gpx_upload_status tests ----


@pytest.mark.asyncio
async def test_get_gpx_upload_status_pending(er_client, gpx_status_pending_response):
async with respx.mock(
base_url=er_client._api_root("v1.0"), assert_all_called=False
) as respx_mock:
route = respx_mock.get(f"source/{SOURCE_ID}/gpxdata/status/{TASK_ID}")
route.return_value = httpx.Response(
httpx.codes.OK, json=gpx_status_pending_response
)
response = await er_client.get_gpx_upload_status(source_id=SOURCE_ID, task_id=TASK_ID)
assert route.called
assert response == gpx_status_pending_response["data"]
assert response["task_status"] == "Pending"
assert response["task_success"] is False
await er_client.close()


@pytest.mark.asyncio
async def test_get_gpx_upload_status_success(er_client, gpx_status_success_response):
async with respx.mock(
base_url=er_client._api_root("v1.0"), assert_all_called=False
) as respx_mock:
route = respx_mock.get(f"source/{SOURCE_ID}/gpxdata/status/{TASK_ID}")
route.return_value = httpx.Response(
httpx.codes.OK, json=gpx_status_success_response
)
response = await er_client.get_gpx_upload_status(source_id=SOURCE_ID, task_id=TASK_ID)
assert route.called
assert response == gpx_status_success_response["data"]
assert response["task_status"] == "Success"
assert response["task_success"] is True
assert response["task_result"]["observations_created"] == 42
await er_client.close()


@pytest.mark.asyncio
async def test_get_gpx_upload_status_not_found(er_client):
async with respx.mock(
base_url=er_client._api_root("v1.0"), assert_all_called=False
) as respx_mock:
route = respx_mock.get(f"source/{SOURCE_ID}/gpxdata/status/{TASK_ID}")
route.return_value = httpx.Response(
httpx.codes.NOT_FOUND,
json={"status": {"code": 404, "detail": "not found"}},
)
with pytest.raises(ERClientNotFound):
await er_client.get_gpx_upload_status(source_id=SOURCE_ID, task_id=TASK_ID)
assert route.called
await er_client.close()


@pytest.mark.asyncio
async def test_get_gpx_upload_status_forbidden(er_client):
async with respx.mock(
base_url=er_client._api_root("v1.0"), assert_all_called=False
) as respx_mock:
path = f"source/{SOURCE_ID}/gpxdata/status/{TASK_ID}"
route = respx_mock.get(path)
forbidden_response = {
"data": [],
"status": {
"code": 403,
"message": "Forbidden",
"detail": "You do not have permission to perform this action.",
},
}
route.return_value = httpx.Response(
httpx.codes.FORBIDDEN, json=forbidden_response
)
with pytest.raises(ERClientPermissionDenied):
await er_client.get_gpx_upload_status(source_id=SOURCE_ID, task_id=TASK_ID)
assert route.called
await er_client.close()


@pytest.mark.asyncio
async def test_get_gpx_upload_status_timeout(er_client):
async with respx.mock(
base_url=er_client._api_root("v1.0"), assert_all_called=False
) as respx_mock:
route = respx_mock.get(f"source/{SOURCE_ID}/gpxdata/status/{TASK_ID}")
route.side_effect = httpx.ReadTimeout
with pytest.raises(ERClientException):
await er_client.get_gpx_upload_status(source_id=SOURCE_ID, task_id=TASK_ID)
assert route.called
await er_client.close()
Loading
Loading