diff --git a/tests/test_sdmx_plugin.py b/tests/test_sdmx_plugin.py index a8184b4..9bc8643 100644 --- a/tests/test_sdmx_plugin.py +++ b/tests/test_sdmx_plugin.py @@ -1,5 +1,6 @@ from toolkit.core.exceptions import DownloadError from toolkit.plugins.sdmx import SdmxSource +import requests class _FakeResponse: @@ -96,7 +97,7 @@ def _fake_get(url, params=None, timeout=None, headers=None): monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get) - payload, origin = SdmxSource().fetch( + payload, origin = SdmxSource(retries=1).fetch( "IT1", "22_289", "1.5", @@ -152,3 +153,164 @@ def _fake_get(url, params=None, timeout=None, headers=None): assert "Unknown SDMX filter dimensions" in str(exc) else: raise AssertionError("Expected DownloadError") + + +def test_sdmx_fetch_falls_back_on_metadata_timeout(monkeypatch): + calls = [] + + def _fake_get(url, params=None, timeout=None, headers=None): + calls.append(url) + if url == "https://sdmx.istat.it/SDMXWS/rest/dataflow/IT1/22_289": + raise requests.exceptions.Timeout("metadata timeout") + if url == "https://esploradati.istat.it/SDMXWS/rest/dataflow/IT1/22_289": + return _FakeResponse(200, DATAFLOW_XML, url) + if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/all": + return _FakeResponse(200, PREVIEW_JSON, url) + if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99": + return _FakeResponse(200, DATA_JSON, url) + raise AssertionError(f"Unexpected URL {url}") + + monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get) + + payload, origin = SdmxSource(retries=1).fetch( + "IT1", + "22_289", + "1.5", + { + "FREQ": "A", + "REF_AREA": "001001", + "DATA_TYPE": "JAN", + "SEX": "9", + "AGE": "TOTAL", + "MARITAL_STATUS": "99", + }, + ) + + assert origin.endswith("/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99") + assert payload.decode("utf-8").startswith("FREQ,FREQ_label") + assert calls[:2] == [ + "https://sdmx.istat.it/SDMXWS/rest/dataflow/IT1/22_289", + "https://esploradati.istat.it/SDMXWS/rest/dataflow/IT1/22_289", + ] + + +def test_sdmx_fetch_falls_back_on_data_5xx(monkeypatch): + calls = [] + + def _fake_get(url, params=None, timeout=None, headers=None): + calls.append(url) + if url == "https://sdmx.istat.it/SDMXWS/rest/dataflow/IT1/22_289": + return _FakeResponse(200, DATAFLOW_XML, url) + if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/all": + return _FakeResponse(500, "boom", url) + if url == "https://sdmx.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/all": + return _FakeResponse(200, PREVIEW_JSON, url) + if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99": + return _FakeResponse(500, "boom", url) + if url == "https://sdmx.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99": + return _FakeResponse(200, DATA_JSON, url) + raise AssertionError(f"Unexpected URL {url}") + + monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get) + + payload, origin = SdmxSource( + retries=1, + data_base_url="https://esploradati.istat.it/SDMXWS/rest", + metadata_base_url="https://sdmx.istat.it/SDMXWS/rest", + ).fetch( + "IT1", + "22_289", + "1.5", + { + "FREQ": "A", + "REF_AREA": "001001", + "DATA_TYPE": "JAN", + "SEX": "9", + "AGE": "TOTAL", + "MARITAL_STATUS": "99", + }, + ) + + assert origin == "https://sdmx.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99" + assert payload.decode("utf-8").startswith("FREQ,FREQ_label") + + +def test_sdmx_fetch_does_not_fallback_on_404(monkeypatch): + def _fake_get(url, params=None, timeout=None, headers=None): + if url == "https://sdmx.istat.it/SDMXWS/rest/dataflow/IT1/22_289": + return _FakeResponse(200, DATAFLOW_XML, url) + if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/all": + return _FakeResponse(200, PREVIEW_JSON, url) + if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99": + return _FakeResponse(404, "not found", url) + raise AssertionError(f"Unexpected URL {url}") + + monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get) + + try: + SdmxSource( + retries=1, + data_base_url="https://esploradati.istat.it/SDMXWS/rest", + metadata_base_url="https://sdmx.istat.it/SDMXWS/rest", + ).fetch( + "IT1", + "22_289", + "1.5", + { + "FREQ": "A", + "REF_AREA": "001001", + "DATA_TYPE": "JAN", + "SEX": "9", + "AGE": "TOTAL", + "MARITAL_STATUS": "99", + }, + ) + except DownloadError as exc: + assert "HTTP 404" in str(exc) + else: + raise AssertionError("Expected DownloadError") + + +def test_sdmx_fetch_does_not_fallback_on_connection_error(monkeypatch): + calls = [] + + def _fake_get(url, params=None, timeout=None, headers=None): + calls.append(url) + if url == "https://sdmx.istat.it/SDMXWS/rest/dataflow/IT1/22_289": + return _FakeResponse(200, DATAFLOW_XML, url) + if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/all": + return _FakeResponse(200, PREVIEW_JSON, url) + if url == "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99": + raise requests.exceptions.ConnectionError("tls handshake failed") + raise AssertionError(f"Unexpected URL {url}") + + monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get) + + try: + SdmxSource( + retries=1, + data_base_url="https://esploradati.istat.it/SDMXWS/rest", + metadata_base_url="https://sdmx.istat.it/SDMXWS/rest", + ).fetch( + "IT1", + "22_289", + "1.5", + { + "FREQ": "A", + "REF_AREA": "001001", + "DATA_TYPE": "JAN", + "SEX": "9", + "AGE": "TOTAL", + "MARITAL_STATUS": "99", + }, + ) + except DownloadError as exc: + assert "connection error" in str(exc).lower() + else: + raise AssertionError("Expected DownloadError") + + assert calls == [ + "https://sdmx.istat.it/SDMXWS/rest/dataflow/IT1/22_289", + "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/all", + "https://esploradati.istat.it/SDMXWS/rest/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99", + ] diff --git a/toolkit/plugins/sdmx.py b/toolkit/plugins/sdmx.py index f9b4542..c8eb4c1 100644 --- a/toolkit/plugins/sdmx.py +++ b/toolkit/plugins/sdmx.py @@ -4,6 +4,7 @@ import io import json import xml.etree.ElementTree as ET +from typing import Iterable import requests @@ -27,6 +28,10 @@ def _flow_ref(agency: str, flow: str, version: str) -> str: return f"{agency},{flow},{version}" +ISTAT_SDMX_BASE = "https://sdmx.istat.it/SDMXWS/rest" +ISTAT_ESPLORADATI_BASE = "https://esploradati.istat.it/SDMXWS/rest" + + class SdmxSource: """Fetch SDMX data as a normalized CSV payload.""" @@ -42,12 +47,53 @@ def __init__( self.retries = retries self.user_agent = user_agent or "dataciviclab-toolkit/0.1" self.data_base_url = _normalize_base_url( - data_base_url or "https://esploradati.istat.it/SDMXWS/rest" + data_base_url or ISTAT_ESPLORADATI_BASE ) self.metadata_base_url = _normalize_base_url( - metadata_base_url or "https://sdmx.istat.it/SDMXWS/rest" + metadata_base_url or ISTAT_SDMX_BASE ) + def _candidate_base_urls(self, agency: str, primary: str, alternate: str) -> list[str]: + normalized_primary = _normalize_base_url(primary) + if agency != "IT1": + return [normalized_primary] + + urls = [normalized_primary] + normalized_alternate = _normalize_base_url(alternate) + if normalized_alternate not in urls: + urls.append(normalized_alternate) + return urls + + def _metadata_base_urls(self, agency: str) -> list[str]: + return self._candidate_base_urls(agency, self.metadata_base_url, ISTAT_ESPLORADATI_BASE) + + def _data_base_urls(self, agency: str) -> list[str]: + return self._candidate_base_urls(agency, self.data_base_url, ISTAT_SDMX_BASE) + + def _is_retryable_fallback_error(self, exc: DownloadError) -> bool: + text = str(exc).lower() + return "endpoint timeout" in text or "endpoint error (http 5" in text + + def _get_text_from_candidates( + self, + base_urls: Iterable[str], + path: str, + *, + accept: str | None = None, + params: dict[str, str] | None = None, + ) -> tuple[str, str]: + base_url_list = list(base_urls) + last_err: DownloadError | None = None + for idx, base_url in enumerate(base_url_list): + try: + return self._get_text(base_url, path, accept=accept, params=params) + except DownloadError as exc: + last_err = exc + has_more = idx < len(base_url_list) - 1 + if not has_more or not self._is_retryable_fallback_error(exc): + raise + raise last_err or DownloadError("Failed to fetch SDMX resource") + def _get_text( self, base_url: str, @@ -66,21 +112,38 @@ def _get_text( try: response = requests.get(url, params=params, timeout=self.timeout, headers=headers) if response.status_code != 200: - raise DownloadError(f"HTTP {response.status_code} for {response.url}") + if response.status_code == 404: + raise DownloadError( + f"SDMX query not found (HTTP 404) for {response.url}" + ) + if 500 <= response.status_code <= 599: + raise DownloadError( + f"SDMX endpoint error (HTTP {response.status_code}) for {response.url}" + ) + raise DownloadError(f"SDMX HTTP {response.status_code} for {response.url}") return response.text, response.url + except requests.exceptions.Timeout as exc: + last_err = DownloadError(f"SDMX endpoint timeout for {url}: {exc}") + except requests.exceptions.ConnectionError as exc: + last_err = DownloadError( + f"SDMX endpoint connection error for {url}: {exc}" + ) except Exception as exc: - last_err = exc - raise DownloadError(str(last_err) if last_err else f"Failed to fetch {url}") + if isinstance(exc, DownloadError): + last_err = exc + else: + last_err = DownloadError(str(exc)) + raise last_err or DownloadError(f"Failed to fetch {url}") def _get_json( self, - base_url: str, + base_urls: Iterable[str], path: str, *, params: dict[str, str] | None = None, ) -> tuple[dict, str]: - text, origin = self._get_text( - base_url, + text, origin = self._get_text_from_candidates( + base_urls, path, accept="application/json", params=params, @@ -91,7 +154,10 @@ def _get_json( raise DownloadError(f"Invalid SDMX JSON payload from {origin}") from exc def _get_dataflow(self, agency: str, flow: str) -> ET.Element: - xml_text, _origin = self._get_text(self.metadata_base_url, f"dataflow/{agency}/{flow}") + xml_text, _origin = self._get_text_from_candidates( + self._metadata_base_urls(agency), + f"dataflow/{agency}/{flow}", + ) try: return ET.fromstring(xml_text) except ET.ParseError as exc: @@ -112,7 +178,7 @@ def _current_version(self, root: ET.Element) -> str: def _preview_dimensions(self, agency: str, flow: str, version: str) -> list[str]: flow_ref = _flow_ref(agency, flow, version) payload, _origin = self._get_json( - self.data_base_url, + self._data_base_urls(agency), f"data/{flow_ref}/all", params={"firstNObservations": "0"}, ) @@ -254,7 +320,7 @@ def fetch( flow_ref = _flow_ref(agency, flow, version) dimensions = self._preview_dimensions(agency, flow, version) key = self._build_key(dimensions, filters) - payload, origin = self._get_json(self.data_base_url, f"data/{flow_ref}/{key}") + payload, origin = self._get_json(self._data_base_urls(agency), f"data/{flow_ref}/{key}") header, rows = self._normalize_rows(payload) if not rows: raise DownloadError(f"SDMX data returned no rows for {agency}/{flow} and key={key}")