Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 76 additions & 32 deletions erclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,44 @@ def get_sources(self, page_size=100):
def get_users(self):
return self._get('users')

# -- V2 Schema Methods --

def get_schema(self, schema_name):
"""
Get a dynamic schema from the v2 API.
:param schema_name: Name of the schema resource (e.g. 'users', 'sources', 'subjects',
'choices', 'spatial_features', 'event_types')
:return: schema dict (JSON Schema)
"""
return self._get(
f'schemas/{schema_name}.json',
base_url=self._api_root('v2.0'),
)

def get_users_schema(self):
"""Get the users JSON schema from the v2 API."""
return self.get_schema('users')

def get_sources_schema(self):
"""Get the sources JSON schema from the v2 API."""
return self.get_schema('sources')

def get_subjects_schema(self):
"""Get the subjects JSON schema from the v2 API."""
return self.get_schema('subjects')

def get_choices_schema(self):
"""Get the choices JSON schema from the v2 API."""
return self.get_schema('choices')

def get_spatial_features_schema(self):
"""Get the spatial_features JSON schema from the v2 API."""
return self.get_schema('spatial_features')

def get_event_types_schema(self):
"""Get the event_types JSON schema from the v2 API."""
return self.get_schema('event_types')


class AsyncERClient(object):
"""
Expand Down Expand Up @@ -1613,6 +1651,44 @@ async def get_feature_group(self, feature_group_id: str):
"""
return await self._get(f"spatialfeaturegroup/{feature_group_id}", params={})

# -- V2 Schema Methods --

async def get_schema(self, schema_name):
"""
Get a dynamic schema from the v2 API.
:param schema_name: Name of the schema resource (e.g. 'users', 'sources', 'subjects',
'choices', 'spatial_features', 'event_types')
:return: schema dict (JSON Schema)
"""
return await self._get(
f'schemas/{schema_name}.json',
base_url=self._api_root('v2.0'),
)

async def get_users_schema(self):
"""Get the users JSON schema from the v2 API."""
return await self.get_schema('users')

async def get_sources_schema(self):
"""Get the sources JSON schema from the v2 API."""
return await self.get_schema('sources')

async def get_subjects_schema(self):
"""Get the subjects JSON schema from the v2 API."""
return await self.get_schema('subjects')

async def get_choices_schema(self):
"""Get the choices JSON schema from the v2 API."""
return await self.get_schema('choices')

async def get_spatial_features_schema(self):
"""Get the spatial_features JSON schema from the v2 API."""
return await self.get_schema('spatial_features')

async def get_event_types_schema(self):
"""Get the event_types JSON schema from the v2 API."""
return await self.get_schema('event_types')

async def _get_data(self, endpoint, params, batch_size=0):
if "page" not in params: # Use cursor paginator unless the user has specified a page
params["use_cursor"] = "true"
Expand Down Expand Up @@ -1646,38 +1722,6 @@ async def _get_data(self, endpoint, params, batch_size=0):
async def _get(self, path, base_url=None, params=None):
return await self._call(path=path, payload=None, method="GET", params=params, base_url=base_url)

async def _delete(self, path):
"""Issue DELETE request. Returns True on success; raises ERClient* on error."""
try:
auth_headers = await self.auth_headers()
except httpx.HTTPStatusError as e:
self._handle_http_status_error(path, "DELETE", e)
headers = {'User-Agent': self.user_agent, **auth_headers}
if not path.startswith('http'):
path = self._er_url(path)
try:
response = await self._http_session.delete(path, headers=headers)
except httpx.RequestError as e:
reason = str(e)
self.logger.error('Request to ER failed', extra=dict(provider_key=self.provider_key,
url=path,
reason=reason))
raise ERClientException(f'Request to ER failed: {reason}')
if response.is_success:
return True
if response.status_code == 404:
self.logger.error("404 when calling %s", path)
raise ERClientNotFound()
if response.status_code == 403:
try:
reason = response.json().get('status', {}).get('detail', 'unknown reason')
except Exception:
reason = 'unknown reason'
raise ERClientPermissionDenied(reason)
raise ERClientException(
f'Failed to delete: {response.status_code} {response.text}'
)

async def get_file(self, url):
"""
Download a file (e.g. attachment URL). Returns the httpx response; body is read into memory.
Expand Down
134 changes: 134 additions & 0 deletions tests/async_client/test_v2_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import httpx
import pytest
import respx

from erclient import ERClientNotFound, ERClientPermissionDenied


SCHEMA_NAMES = ['users', 'sources', 'subjects', 'choices', 'spatial_features', 'event_types']


Comment on lines +8 to +10
@pytest.fixture
def sample_schema():
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Users",
"type": "object",
"properties": {
"id": {"type": "string", "format": "uuid"},
"username": {"type": "string"},
"email": {"type": "string", "format": "email"},
},
"required": ["username"],
}


@pytest.mark.asyncio
async def test_get_schema_generic(er_client, sample_schema):
"""Test the generic get_schema() method with an arbitrary schema name."""
v2_root = er_client._api_root('v2.0')
async with respx.mock(assert_all_called=False) as respx_mock:
route = respx_mock.get(f"{v2_root}/schemas/users.json")
route.return_value = httpx.Response(httpx.codes.OK, json={"data": sample_schema})
result = await er_client.get_schema('users')
assert route.called
assert result == sample_schema
await er_client.close()


@pytest.mark.asyncio
async def test_get_schema_not_found(er_client):
"""Test that a 404 for a non-existent schema raises ERClientNotFound."""
v2_root = er_client._api_root('v2.0')
async with respx.mock(assert_all_called=False) as respx_mock:
route = respx_mock.get(f"{v2_root}/schemas/nonexistent.json")
route.return_value = httpx.Response(httpx.codes.NOT_FOUND, json={"status": {"code": 404}})
with pytest.raises(ERClientNotFound):
await er_client.get_schema('nonexistent')
assert route.called
await er_client.close()


@pytest.mark.asyncio
async def test_get_schema_forbidden(er_client):
"""Test that a 403 for a schema endpoint raises ERClientPermissionDenied."""
v2_root = er_client._api_root('v2.0')
async with respx.mock(assert_all_called=False) as respx_mock:
route = respx_mock.get(f"{v2_root}/schemas/users.json")
route.return_value = httpx.Response(httpx.codes.FORBIDDEN, json={"status": {"code": 403}})
with pytest.raises(ERClientPermissionDenied):
await er_client.get_schema('users')
assert route.called
await er_client.close()


@pytest.mark.asyncio
async def test_get_users_schema(er_client, sample_schema):
v2_root = er_client._api_root('v2.0')
async with respx.mock(assert_all_called=False) as respx_mock:
route = respx_mock.get(f"{v2_root}/schemas/users.json")
route.return_value = httpx.Response(httpx.codes.OK, json={"data": sample_schema})
result = await er_client.get_users_schema()
assert route.called
assert result == sample_schema
await er_client.close()


@pytest.mark.asyncio
async def test_get_sources_schema(er_client, sample_schema):
v2_root = er_client._api_root('v2.0')
async with respx.mock(assert_all_called=False) as respx_mock:
route = respx_mock.get(f"{v2_root}/schemas/sources.json")
route.return_value = httpx.Response(httpx.codes.OK, json={"data": sample_schema})
result = await er_client.get_sources_schema()
assert route.called
assert result == sample_schema
await er_client.close()


@pytest.mark.asyncio
async def test_get_subjects_schema(er_client, sample_schema):
v2_root = er_client._api_root('v2.0')
async with respx.mock(assert_all_called=False) as respx_mock:
route = respx_mock.get(f"{v2_root}/schemas/subjects.json")
route.return_value = httpx.Response(httpx.codes.OK, json={"data": sample_schema})
result = await er_client.get_subjects_schema()
assert route.called
assert result == sample_schema
await er_client.close()


@pytest.mark.asyncio
async def test_get_choices_schema(er_client, sample_schema):
v2_root = er_client._api_root('v2.0')
async with respx.mock(assert_all_called=False) as respx_mock:
route = respx_mock.get(f"{v2_root}/schemas/choices.json")
route.return_value = httpx.Response(httpx.codes.OK, json={"data": sample_schema})
result = await er_client.get_choices_schema()
assert route.called
assert result == sample_schema
await er_client.close()


@pytest.mark.asyncio
async def test_get_spatial_features_schema(er_client, sample_schema):
v2_root = er_client._api_root('v2.0')
async with respx.mock(assert_all_called=False) as respx_mock:
route = respx_mock.get(f"{v2_root}/schemas/spatial_features.json")
route.return_value = httpx.Response(httpx.codes.OK, json={"data": sample_schema})
result = await er_client.get_spatial_features_schema()
assert route.called
assert result == sample_schema
await er_client.close()


@pytest.mark.asyncio
async def test_get_event_types_schema(er_client, sample_schema):
v2_root = er_client._api_root('v2.0')
async with respx.mock(assert_all_called=False) as respx_mock:
route = respx_mock.get(f"{v2_root}/schemas/event_types.json")
route.return_value = httpx.Response(httpx.codes.OK, json={"data": sample_schema})
result = await er_client.get_event_types_schema()
assert route.called
assert result == sample_schema
await er_client.close()
134 changes: 134 additions & 0 deletions tests/sync_client/test_v2_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import json
from unittest.mock import MagicMock

import pytest

from erclient import ERClientNotFound, ERClientPermissionDenied


def _mock_response(status_code=200, json_data=None):
"""Helper to create a mock response object."""
response = MagicMock()
response.ok = 200 <= status_code < 400
response.status_code = status_code
response.text = json.dumps(json_data) if json_data else ""
response.json.return_value = json_data
response.url = "https://fake-site.erdomain.org/api/v2.0/schemas/test"
return response


@pytest.fixture
def sample_schema():
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Users",
"type": "object",
"properties": {
"id": {"type": "string", "format": "uuid"},
"username": {"type": "string"},
"email": {"type": "string", "format": "email"},
},
"required": ["username"],
}


# -- _api_root() method --


def test_api_root_v2(er_client):
"""Verify the v2 root is derived correctly via _api_root()."""
assert er_client._api_root('v2.0') == "https://fake-site.erdomain.org/api/v2.0"


# -- Generic get_schema --


def test_get_schema_generic(er_client, sample_schema):
er_client._http_session.get = MagicMock(
return_value=_mock_response(200, {"data": sample_schema})
)
result = er_client.get_schema("users")
er_client._http_session.get.assert_called_once()
# Verify the URL used the v2 root
call_args = er_client._http_session.get.call_args
assert "/api/v2.0/schemas/users.json" in call_args[0][0]
assert result == sample_schema


def test_get_schema_not_found(er_client):
er_client._http_session.get = MagicMock(
return_value=_mock_response(404, {"status": {"code": 404}})
)
with pytest.raises(ERClientNotFound):
er_client.get_schema("nonexistent")


def test_get_schema_forbidden(er_client):
er_client._http_session.get = MagicMock(
return_value=_mock_response(403, {"status": {"detail": "Forbidden"}})
)
with pytest.raises(ERClientPermissionDenied):
er_client.get_schema("users")


# -- Individual schema convenience methods --


def test_get_users_schema(er_client, sample_schema):
er_client._http_session.get = MagicMock(
return_value=_mock_response(200, {"data": sample_schema})
)
result = er_client.get_users_schema()
call_url = er_client._http_session.get.call_args[0][0]
assert "/api/v2.0/schemas/users.json" in call_url
assert result == sample_schema


def test_get_sources_schema(er_client, sample_schema):
er_client._http_session.get = MagicMock(
return_value=_mock_response(200, {"data": sample_schema})
)
result = er_client.get_sources_schema()
call_url = er_client._http_session.get.call_args[0][0]
assert "/api/v2.0/schemas/sources.json" in call_url
assert result == sample_schema


def test_get_subjects_schema(er_client, sample_schema):
er_client._http_session.get = MagicMock(
return_value=_mock_response(200, {"data": sample_schema})
)
result = er_client.get_subjects_schema()
call_url = er_client._http_session.get.call_args[0][0]
assert "/api/v2.0/schemas/subjects.json" in call_url
assert result == sample_schema


def test_get_choices_schema(er_client, sample_schema):
er_client._http_session.get = MagicMock(
return_value=_mock_response(200, {"data": sample_schema})
)
result = er_client.get_choices_schema()
call_url = er_client._http_session.get.call_args[0][0]
assert "/api/v2.0/schemas/choices.json" in call_url
assert result == sample_schema


def test_get_spatial_features_schema(er_client, sample_schema):
er_client._http_session.get = MagicMock(
return_value=_mock_response(200, {"data": sample_schema})
)
result = er_client.get_spatial_features_schema()
call_url = er_client._http_session.get.call_args[0][0]
assert "/api/v2.0/schemas/spatial_features.json" in call_url
assert result == sample_schema


def test_get_event_types_schema(er_client, sample_schema):
er_client._http_session.get = MagicMock(
return_value=_mock_response(200, {"data": sample_schema})
)
result = er_client.get_event_types_schema()
call_url = er_client._http_session.get.call_args[0][0]
assert "/api/v2.0/schemas/event_types.json" in call_url
assert result == sample_schema
Loading