Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 163 additions & 1 deletion tests/test_sdmx_plugin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from toolkit.core.exceptions import DownloadError
from toolkit.plugins.sdmx import SdmxSource
import requests


class _FakeResponse:
Expand Down Expand Up @@ -96,7 +97,7 @@ def _fake_get(url, params=None, timeout=None, headers=None):

monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get)

payload, origin = SdmxSource().fetch(
payload, origin = SdmxSource(retries=1).fetch(
"IT1",
"22_289",
"1.5",
Expand Down Expand Up @@ -152,3 +153,164 @@ def _fake_get(url, params=None, timeout=None, headers=None):
assert "Unknown SDMX filter dimensions" in str(exc)
else:
raise AssertionError("Expected DownloadError")


def test_sdmx_fetch_falls_back_on_metadata_timeout(monkeypatch):
calls = []

def _fake_get(url, params=None, timeout=None, headers=None):
calls.append(url)
if url == "https://sdmx.istat.it/SDMXWS/rest/dataflow/IT1/22_289":
raise requests.exceptions.Timeout("metadata timeout")
if url == "https://esploradati.istat.it/SDMXWS/rest/dataflow/IT1/22_289":
return _FakeResponse(200, DATAFLOW_XML, url)
if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/all":
return _FakeResponse(200, PREVIEW_JSON, url)
if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99":
return _FakeResponse(200, DATA_JSON, url)
raise AssertionError(f"Unexpected URL {url}")

monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get)

payload, origin = SdmxSource(retries=1).fetch(
"IT1",
"22_289",
"1.5",
{
"FREQ": "A",
"REF_AREA": "001001",
"DATA_TYPE": "JAN",
"SEX": "9",
"AGE": "TOTAL",
"MARITAL_STATUS": "99",
},
)

assert origin.endswith("/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99")
assert payload.decode("utf-8").startswith("FREQ,FREQ_label")
assert calls[:2] == [
"https://sdmx.istat.it/SDMXWS/rest/dataflow/IT1/22_289",
"https://esploradati.istat.it/SDMXWS/rest/dataflow/IT1/22_289",
]


def test_sdmx_fetch_falls_back_on_data_5xx(monkeypatch):
calls = []

def _fake_get(url, params=None, timeout=None, headers=None):
calls.append(url)
if url == "https://sdmx.istat.it/SDMXWS/rest/dataflow/IT1/22_289":
return _FakeResponse(200, DATAFLOW_XML, url)
if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/all":
return _FakeResponse(500, "boom", url)
if url == "https://sdmx.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/all":
return _FakeResponse(200, PREVIEW_JSON, url)
if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99":
return _FakeResponse(500, "boom", url)
if url == "https://sdmx.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99":
return _FakeResponse(200, DATA_JSON, url)
raise AssertionError(f"Unexpected URL {url}")

monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get)

payload, origin = SdmxSource(
retries=1,
data_base_url="https://esploradati.istat.it/SDMXWS/rest",
metadata_base_url="https://sdmx.istat.it/SDMXWS/rest",
).fetch(
"IT1",
"22_289",
"1.5",
{
"FREQ": "A",
"REF_AREA": "001001",
"DATA_TYPE": "JAN",
"SEX": "9",
"AGE": "TOTAL",
"MARITAL_STATUS": "99",
},
)

assert origin == "https://sdmx.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99"
assert payload.decode("utf-8").startswith("FREQ,FREQ_label")


def test_sdmx_fetch_does_not_fallback_on_404(monkeypatch):
def _fake_get(url, params=None, timeout=None, headers=None):
if url == "https://sdmx.istat.it/SDMXWS/rest/dataflow/IT1/22_289":
return _FakeResponse(200, DATAFLOW_XML, url)
if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/all":
return _FakeResponse(200, PREVIEW_JSON, url)
if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99":
return _FakeResponse(404, "not found", url)
raise AssertionError(f"Unexpected URL {url}")

monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get)

try:
SdmxSource(
retries=1,
data_base_url="https://esploradati.istat.it/SDMXWS/rest",
metadata_base_url="https://sdmx.istat.it/SDMXWS/rest",
).fetch(
"IT1",
"22_289",
"1.5",
{
"FREQ": "A",
"REF_AREA": "001001",
"DATA_TYPE": "JAN",
"SEX": "9",
"AGE": "TOTAL",
"MARITAL_STATUS": "99",
},
)
except DownloadError as exc:
assert "HTTP 404" in str(exc)
else:
raise AssertionError("Expected DownloadError")


def test_sdmx_fetch_does_not_fallback_on_connection_error(monkeypatch):
calls = []

def _fake_get(url, params=None, timeout=None, headers=None):
calls.append(url)
if url == "https://sdmx.istat.it/SDMXWS/rest/dataflow/IT1/22_289":
return _FakeResponse(200, DATAFLOW_XML, url)
if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/all":
return _FakeResponse(200, PREVIEW_JSON, url)
if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99":
raise requests.exceptions.ConnectionError("tls handshake failed")
raise AssertionError(f"Unexpected URL {url}")

monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get)

try:
SdmxSource(
retries=1,
data_base_url="https://esploradati.istat.it/SDMXWS/rest",
metadata_base_url="https://sdmx.istat.it/SDMXWS/rest",
).fetch(
"IT1",
"22_289",
"1.5",
{
"FREQ": "A",
"REF_AREA": "001001",
"DATA_TYPE": "JAN",
"SEX": "9",
"AGE": "TOTAL",
"MARITAL_STATUS": "99",
},
)
except DownloadError as exc:
assert "connection error" in str(exc).lower()
else:
raise AssertionError("Expected DownloadError")

assert calls == [
"https://sdmx.istat.it/SDMXWS/rest/dataflow/IT1/22_289",
"https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/all",
"https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99",
]
88 changes: 77 additions & 11 deletions toolkit/plugins/sdmx.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io
import json
import xml.etree.ElementTree as ET
from typing import Iterable

import requests

Expand All @@ -27,6 +28,10 @@ def _flow_ref(agency: str, flow: str, version: str) -> str:
return f"{agency},{flow},{version}"


ISTAT_SDMX_BASE = "https://sdmx.istat.it/SDMXWS/rest"
ISTAT_ESPLORADATI_BASE = "https://esploradati.istat.it/SDMXWS/rest"


class SdmxSource:
"""Fetch SDMX data as a normalized CSV payload."""

Expand All @@ -42,12 +47,53 @@ def __init__(
self.retries = retries
self.user_agent = user_agent or "dataciviclab-toolkit/0.1"
self.data_base_url = _normalize_base_url(
data_base_url or "https://esploradati.istat.it/SDMXWS/rest"
data_base_url or ISTAT_ESPLORADATI_BASE
)
self.metadata_base_url = _normalize_base_url(
metadata_base_url or "https://sdmx.istat.it/SDMXWS/rest"
metadata_base_url or ISTAT_SDMX_BASE
)

def _candidate_base_urls(self, agency: str, primary: str, alternate: str) -> list[str]:
normalized_primary = _normalize_base_url(primary)
if agency != "IT1":
return [normalized_primary]

urls = [normalized_primary]
normalized_alternate = _normalize_base_url(alternate)
if normalized_alternate not in urls:
urls.append(normalized_alternate)
return urls

def _metadata_base_urls(self, agency: str) -> list[str]:
return self._candidate_base_urls(agency, self.metadata_base_url, ISTAT_ESPLORADATI_BASE)

def _data_base_urls(self, agency: str) -> list[str]:
return self._candidate_base_urls(agency, self.data_base_url, ISTAT_SDMX_BASE)

def _is_retryable_fallback_error(self, exc: DownloadError) -> bool:
text = str(exc).lower()
return "endpoint timeout" in text or "endpoint error (http 5" in text

def _get_text_from_candidates(
self,
base_urls: Iterable[str],
path: str,
*,
accept: str | None = None,
params: dict[str, str] | None = None,
) -> tuple[str, str]:
base_url_list = list(base_urls)
last_err: DownloadError | None = None
for idx, base_url in enumerate(base_url_list):
try:
return self._get_text(base_url, path, accept=accept, params=params)
except DownloadError as exc:
last_err = exc
has_more = idx < len(base_url_list) - 1
if not has_more or not self._is_retryable_fallback_error(exc):
raise
raise last_err or DownloadError("Failed to fetch SDMX resource")

def _get_text(
self,
base_url: str,
Expand All @@ -66,21 +112,38 @@ def _get_text(
try:
response = requests.get(url, params=params, timeout=self.timeout, headers=headers)
if response.status_code != 200:
raise DownloadError(f"HTTP {response.status_code} for {response.url}")
if response.status_code == 404:
raise DownloadError(
f"SDMX query not found (HTTP 404) for {response.url}"
)
if 500 <= response.status_code <= 599:
raise DownloadError(
f"SDMX endpoint error (HTTP {response.status_code}) for {response.url}"
)
raise DownloadError(f"SDMX HTTP {response.status_code} for {response.url}")
return response.text, response.url
except requests.exceptions.Timeout as exc:
last_err = DownloadError(f"SDMX endpoint timeout for {url}: {exc}")
except requests.exceptions.ConnectionError as exc:
last_err = DownloadError(
f"SDMX endpoint connection error for {url}: {exc}"
)
except Exception as exc:
last_err = exc
raise DownloadError(str(last_err) if last_err else f"Failed to fetch {url}")
if isinstance(exc, DownloadError):
last_err = exc
else:
last_err = DownloadError(str(exc))
raise last_err or DownloadError(f"Failed to fetch {url}")

def _get_json(
self,
base_url: str,
base_urls: Iterable[str],
path: str,
*,
params: dict[str, str] | None = None,
) -> tuple[dict, str]:
text, origin = self._get_text(
base_url,
text, origin = self._get_text_from_candidates(
base_urls,
path,
accept="application/json",
params=params,
Expand All @@ -91,7 +154,10 @@ def _get_json(
raise DownloadError(f"Invalid SDMX JSON payload from {origin}") from exc

def _get_dataflow(self, agency: str, flow: str) -> ET.Element:
xml_text, _origin = self._get_text(self.metadata_base_url, f"dataflow/{agency}/{flow}")
xml_text, _origin = self._get_text_from_candidates(
self._metadata_base_urls(agency),
f"dataflow/{agency}/{flow}",
)
try:
return ET.fromstring(xml_text)
except ET.ParseError as exc:
Expand All @@ -112,7 +178,7 @@ def _current_version(self, root: ET.Element) -> str:
def _preview_dimensions(self, agency: str, flow: str, version: str) -> list[str]:
flow_ref = _flow_ref(agency, flow, version)
payload, _origin = self._get_json(
self.data_base_url,
self._data_base_urls(agency),
f"data/{flow_ref}/all",
params={"firstNObservations": "0"},
)
Expand Down Expand Up @@ -254,7 +320,7 @@ def fetch(
flow_ref = _flow_ref(agency, flow, version)
dimensions = self._preview_dimensions(agency, flow, version)
key = self._build_key(dimensions, filters)
payload, origin = self._get_json(self.data_base_url, f"data/{flow_ref}/{key}")
payload, origin = self._get_json(self._data_base_urls(agency), f"data/{flow_ref}/{key}")
header, rows = self._normalize_rows(payload)
if not rows:
raise DownloadError(f"SDMX data returned no rows for {agency}/{flow} and key={key}")
Expand Down
Loading