Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 72 additions & 33 deletions erclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,42 @@ def get_event_types(self, include_inactive=False, include_schema=False, version=
def get_event_schema(self, event_type):
return self._get(f'activity/events/schema/eventtype/{event_type}')

def get_event_type_schema(self, event_type_value, pre_render=False):
"""
Get the schema for a single event type (v2 API).

:param event_type_value: The event type 'value' identifier (e.g. 'rainfall_rep')
:param pre_render: If True, return the pre-rendered schema
"""
return self._get(
f'eventtypes/{event_type_value}/schema',
base_url=self._api_root('v2.0'),
params={'pre_render': pre_render},
)

def get_event_type_updates(self, event_type_value):
"""
Get the revision history for a single event type (v2 API).

:param event_type_value: The event type 'value' identifier
"""
return self._get(
f'eventtypes/{event_type_value}/updates',
base_url=self._api_root('v2.0'),
)

def get_event_type_schemas(self, pre_render=False):
"""
Get schemas for all event types in bulk (v2 API).

:param pre_render: If True, return pre-rendered schemas
"""
return self._get(
'eventtypes/schemas',
base_url=self._api_root('v2.0'),
params={'pre_render': pre_render},
)
Comment on lines +658 to +692
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new sync client methods get_event_type_schema, get_event_type_updates, and get_event_type_schemas lack test coverage. While there are 8 comprehensive tests for the async versions of these methods, there are no corresponding tests for the sync client. The repository shows a pattern of comprehensive testing for the async client but lacks tests for the sync client entirely. Consider adding test coverage for the sync client methods to match the async client test coverage.

Copilot uses AI. Check for mistakes.
Comment on lines +658 to +692
Copy link
Copy Markdown
Contributor

@chrisdoehring chrisdoehring Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this indicative of v1 EOL?
And you might want to use the VERSION_2_0 constant for these.


def _get_objects_count(self, params):
params = params.copy()
params["page"] = 1
Expand Down Expand Up @@ -1544,6 +1580,42 @@ async def patch_event_type(self, event_type, version=DEFAULT_VERSION):
# self.logger.debug('Result of event type patch is: %s', result)
return result

async def get_event_type_schema(self, event_type_value, pre_render=False):
"""
Get the schema for a single event type (v2 API).

:param event_type_value: The event type 'value' identifier (e.g. 'rainfall_rep')
:param pre_render: If True, return the pre-rendered schema
"""
return await self._get(
f'eventtypes/{event_type_value}/schema',
base_url=self._api_root(VERSION_2_0),
params={'pre_render': pre_render},
)

async def get_event_type_updates(self, event_type_value):
"""
Get the revision history for a single event type (v2 API).

:param event_type_value: The event type 'value' identifier
"""
return await self._get(
f'eventtypes/{event_type_value}/updates',
base_url=self._api_root(VERSION_2_0),
)

async def get_event_type_schemas(self, pre_render=False):
"""
Get schemas for all event types in bulk (v2 API).

:param pre_render: If True, return pre-rendered schemas
"""
return await self._get(
'eventtypes/schemas',
base_url=self._api_root(VERSION_2_0),
params={'pre_render': pre_render},
)

async def get_subjectgroups(
self,
include_inactive=False,
Expand Down Expand Up @@ -1646,38 +1718,6 @@ async def _get_data(self, endpoint, params, batch_size=0):
async def _get(self, path, base_url=None, params=None):
return await self._call(path=path, payload=None, method="GET", params=params, base_url=base_url)

async def _delete(self, path):
"""Issue DELETE request. Returns True on success; raises ERClient* on error."""
try:
auth_headers = await self.auth_headers()
except httpx.HTTPStatusError as e:
self._handle_http_status_error(path, "DELETE", e)
headers = {'User-Agent': self.user_agent, **auth_headers}
if not path.startswith('http'):
path = self._er_url(path)
try:
response = await self._http_session.delete(path, headers=headers)
except httpx.RequestError as e:
reason = str(e)
self.logger.error('Request to ER failed', extra=dict(provider_key=self.provider_key,
url=path,
reason=reason))
raise ERClientException(f'Request to ER failed: {reason}')
if response.is_success:
return True
if response.status_code == 404:
self.logger.error("404 when calling %s", path)
raise ERClientNotFound()
if response.status_code == 403:
try:
reason = response.json().get('status', {}).get('detail', 'unknown reason')
except Exception:
reason = 'unknown reason'
raise ERClientPermissionDenied(reason)
raise ERClientException(
f'Failed to delete: {response.status_code} {response.text}'
)

async def get_file(self, url):
"""
Download a file (e.g. attachment URL). Returns the httpx response; body is read into memory.
Expand Down Expand Up @@ -1722,7 +1762,6 @@ async def _delete(self, path, params=None, base_url=None):
return await self._call(
path=path, payload=None, method="DELETE", params=params, base_url=base_url
)

async def _call(self, path, payload, method, params=None, base_url=None):
try:
auth_headers = await self.auth_headers()
Expand Down
187 changes: 187 additions & 0 deletions tests/async_client/test_get_event_type_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import re

Comment on lines +1 to +2
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The re module is imported but never used in this test file. Consider removing this unused import.

Suggested change
import re

Copilot uses AI. Check for mistakes.
import httpx
import pytest
import respx

from erclient import ERClientException, ERClientNotFound


V2_SERVICE_ROOT = "https://fake-site.erdomain.org/api/v2.0"


@pytest.fixture
def schema_response():
return {
"type": "object",
"properties": {
"amount_mm": {"type": "number", "title": "Amount (mm)"},
"height_m": {"type": "number", "title": "Height (m)"},
},
}


@pytest.fixture
def schema_error_response():
return {
"errors": [
{"code": "schema_not_found", "detail": "No schema found for event type."}
]
}


@pytest.fixture
def schemas_bulk_response():
return {
"count": 2,
"results": [
{
"status": "success",
"event_type": "rainfall_rep",
"schema": {"type": "object", "properties": {}},
},
{
"status": "success",
"event_type": "fire_rep",
"schema": {"type": "object", "properties": {}},
},
],
}


@pytest.fixture
def updates_response():
return {
"count": 2,
"next": None,
"previous": None,
"results": [
{
"sequence": 2,
"data": {"display": "Rainfall v2"},
"created_at": "2025-01-15T10:00:00Z",
},
{
"sequence": 1,
"data": {"display": "Rainfall"},
"created_at": "2025-01-10T10:00:00Z",
},
],
}


@pytest.mark.asyncio
async def test_get_event_type_schema_success(er_client, schema_response):
async with respx.mock(
base_url=V2_SERVICE_ROOT, assert_all_called=False
) as respx_mock:
route = respx_mock.get("/eventtypes/rainfall_rep/schema").mock(
return_value=httpx.Response(httpx.codes.OK, json={"data": schema_response})
)
result = await er_client.get_event_type_schema("rainfall_rep")
assert route.called
assert result == schema_response
await er_client.close()


@pytest.mark.asyncio
async def test_get_event_type_schema_with_pre_render(er_client, schema_response):
async with respx.mock(
base_url=V2_SERVICE_ROOT, assert_all_called=False
) as respx_mock:
route = respx_mock.get("/eventtypes/rainfall_rep/schema").mock(
return_value=httpx.Response(httpx.codes.OK, json={"data": schema_response})
)
result = await er_client.get_event_type_schema("rainfall_rep", pre_render=True)
assert route.called
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable result is not used.

Suggested change
assert route.called
assert route.called
assert result == schema_response

Copilot uses AI. Check for mistakes.
# verify the pre_render param was sent
request = route.calls[0].request
assert "pre_render" in str(request.url)
await er_client.close()


@pytest.mark.asyncio
async def test_get_event_type_schema_not_found(er_client, schema_error_response):
async with respx.mock(
base_url=V2_SERVICE_ROOT, assert_all_called=False
) as respx_mock:
route = respx_mock.get("/eventtypes/nonexistent/schema").mock(
return_value=httpx.Response(httpx.codes.NOT_FOUND, json=schema_error_response)
)
with pytest.raises(ERClientNotFound):
await er_client.get_event_type_schema("nonexistent")
assert route.called
await er_client.close()


@pytest.mark.asyncio
async def test_get_event_type_updates_success(er_client, updates_response):
async with respx.mock(
base_url=V2_SERVICE_ROOT, assert_all_called=False
) as respx_mock:
route = respx_mock.get("/eventtypes/rainfall_rep/updates").mock(
return_value=httpx.Response(httpx.codes.OK, json=updates_response)
)
result = await er_client.get_event_type_updates("rainfall_rep")
assert route.called
# response has no 'data' key, so the full dict is returned
assert result["count"] == 2
assert len(result["results"]) == 2
await er_client.close()


@pytest.mark.asyncio
async def test_get_event_type_updates_not_found(er_client):
async with respx.mock(
base_url=V2_SERVICE_ROOT, assert_all_called=False
) as respx_mock:
route = respx_mock.get("/eventtypes/nonexistent/updates").mock(
return_value=httpx.Response(httpx.codes.NOT_FOUND, json={})
)
with pytest.raises(ERClientNotFound):
await er_client.get_event_type_updates("nonexistent")
assert route.called
await er_client.close()


@pytest.mark.asyncio
async def test_get_event_type_schemas_bulk_success(er_client, schemas_bulk_response):
async with respx.mock(
base_url=V2_SERVICE_ROOT, assert_all_called=False
) as respx_mock:
route = respx_mock.get("/eventtypes/schemas").mock(
return_value=httpx.Response(httpx.codes.OK, json=schemas_bulk_response)
)
result = await er_client.get_event_type_schemas()
assert route.called
assert result["count"] == 2
assert len(result["results"]) == 2
await er_client.close()


@pytest.mark.asyncio
async def test_get_event_type_schemas_bulk_with_pre_render(er_client, schemas_bulk_response):
async with respx.mock(
base_url=V2_SERVICE_ROOT, assert_all_called=False
) as respx_mock:
route = respx_mock.get("/eventtypes/schemas").mock(
return_value=httpx.Response(httpx.codes.OK, json=schemas_bulk_response)
)
result = await er_client.get_event_type_schemas(pre_render=True)
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable result is not used.

Suggested change
result = await er_client.get_event_type_schemas(pre_render=True)
await er_client.get_event_type_schemas(pre_render=True)

Copilot uses AI. Check for mistakes.
assert route.called
request = route.calls[0].request
assert "pre_render" in str(request.url)
await er_client.close()


@pytest.mark.asyncio
async def test_get_event_type_schema_connect_timeout(er_client):
async with respx.mock(
base_url=V2_SERVICE_ROOT, assert_all_called=False
) as respx_mock:
route = respx_mock.get("/eventtypes/rainfall_rep/schema")
route.side_effect = httpx.ConnectTimeout
with pytest.raises(ERClientException):
await er_client.get_event_type_schema("rainfall_rep")
assert route.called
await er_client.close()