diff --git a/docs/config-schema.md b/docs/config-schema.md index 63ddb91..51efc99 100644 --- a/docs/config-schema.md +++ b/docs/config-schema.md @@ -80,6 +80,36 @@ Note pratiche per `ckan`: - se il portale restituisce un file URL in `http://`, il toolkit lo forza automaticamente a `https://` - se `filename` non e dichiarato, il toolkit prova a inferire l'estensione dall'URL risolto +Esempio `sdmx`: + +```yaml +raw: + sources: + - name: popolazione_residente + type: sdmx + client: + timeout: 60 + retries: 2 + args: + agency: IT1 + flow: 22_289 + version: "1.5" + filters: + FREQ: A + REF_AREA: "001001" + DATA_TYPE: JAN + SEX: "9" + AGE: TOTAL + MARITAL_STATUS: "99" +``` + +Note pratiche per `sdmx`: + +- la `version` e' obbligatoria e deve coincidere con la versione corrente esposta dal dataflow +- non esiste fallback silenzioso a `latest` +- in v1 i `filters` sono supportati solo sulle dimensioni di serie, non su `TIME_PERIOD` +- il filtro temporale va applicato nel layer `clean.sql` (per esempio `WHERE TIME_PERIOD = '2024'`), non in `raw.sources[].args.filters` +- il plugin restituisce un CSV normalizzato con colonne `DIM`, `DIM_label` e `value` ## clean | Campo | Tipo | Default | diff --git a/smoke/README.md b/smoke/README.md index ccdc7ce..273690b 100644 --- a/smoke/README.md +++ b/smoke/README.md @@ -20,6 +20,7 @@ Progetti inclusi: - `smoke/zip_http_csv`: `http_file` + extractor ZIP (`unzip_first_csv`) contro server locale - `smoke/bdap_http_csv`: `http_file` contro CSV pubblico BDAP - `smoke/bdap_ckan_csv`: `ckan` contro OpenBDAP, con fallback `package_show` e force `https` +- `smoke/istat_sdmx_22_289`: `sdmx` contro flow ISTAT reale `22_289` - `smoke/finanze_http_zip_2023`: `http_file` contro ZIP pubblico reale, best-effort Ogni progetto include: diff --git a/smoke/istat_sdmx_22_289/README.md b/smoke/istat_sdmx_22_289/README.md new file mode 100644 index 0000000..3cc9e9a --- /dev/null +++ b/smoke/istat_sdmx_22_289/README.md @@ -0,0 +1,24 @@ +# istat_sdmx_22_289 + +Smoke manuale per `sdmx` su dataflow ISTAT `22_289`. + +## Comandi + +```bash +toolkit run raw --config dataset.yml +toolkit profile raw --config dataset.yml +toolkit run clean --config dataset.yml +toolkit run mart --config dataset.yml +toolkit status --dataset istat_sdmx_22_289 --year 2024 --latest --config dataset.yml +``` + +## Verifiche attese + +- `./_smoke_out/data/raw/istat_sdmx_22_289/2024/manifest.json` +- `./_smoke_out/data/raw/istat_sdmx_22_289/2024/raw_validation.json` +- `./_smoke_out/data/clean/istat_sdmx_22_289/2024/metadata.json` +- `./_smoke_out/data/mart/istat_sdmx_22_289/2024/mart_ok.parquet` + +## Nota + +Questo smoke usa un flow ISTAT reale e richiede che la versione `1.5` sia ancora quella esposta dal dataflow `22_289`. diff --git a/smoke/istat_sdmx_22_289/dataset.yml b/smoke/istat_sdmx_22_289/dataset.yml new file mode 100644 index 0000000..4a79a80 --- /dev/null +++ b/smoke/istat_sdmx_22_289/dataset.yml @@ -0,0 +1,37 @@ +root: "./_smoke_out" + +dataset: + name: "istat_sdmx_22_289" + years: [2024] + +raw: + sources: + - name: "popolazione_residente" + type: "sdmx" + args: + agency: "IT1" + flow: "22_289" + version: "1.5" + filters: + FREQ: "A" + REF_AREA: "001001" + DATA_TYPE: "JAN" + SEX: "9" + AGE: "TOTAL" + MARITAL_STATUS: "99" + filename: "istat_popolazione_residente_22_289.csv" + +clean: + sql: "sql/clean.sql" + read_mode: "strict" + read: + mode: "explicit" + delim: "," + encoding: "utf-8" + header: true + validate: {} + +mart: + tables: + - name: "mart_ok" + sql: "sql/mart/mart_ok.sql" diff --git a/smoke/istat_sdmx_22_289/sql/clean.sql b/smoke/istat_sdmx_22_289/sql/clean.sql new file mode 100644 index 0000000..19434cb --- /dev/null +++ b/smoke/istat_sdmx_22_289/sql/clean.sql @@ -0,0 +1,7 @@ +select + TIME_PERIOD, + REF_AREA, + REF_AREA_label, + try_cast(value as bigint) as residenti +from raw_input +where TIME_PERIOD = '2024' diff --git a/smoke/istat_sdmx_22_289/sql/mart/mart_ok.sql b/smoke/istat_sdmx_22_289/sql/mart/mart_ok.sql new file mode 100644 index 0000000..f8c1c17 --- /dev/null +++ b/smoke/istat_sdmx_22_289/sql/mart/mart_ok.sql @@ -0,0 +1,6 @@ +select + TIME_PERIOD, + REF_AREA, + REF_AREA_label, + residenti +from clean_input diff --git a/tests/test_raw_ext_inference.py b/tests/test_raw_ext_inference.py index c6c1285..a588bd0 100644 --- a/tests/test_raw_ext_inference.py +++ b/tests/test_raw_ext_inference.py @@ -25,6 +25,8 @@ def test_infer_ext_ckan_uses_resolved_origin(): assert _infer_ext("ckan", {}, origin="https://example.org/archive.zip.php") == ".zip" +def test_infer_ext_sdmx_is_csv(): + assert _infer_ext("sdmx", {"flow": "22_289"}) == ".csv" def test_infer_ext_never_returns_php(): assert _infer_ext("http_file", {"url": "https://example.org/download.php?id=42"}) != ".php" assert _infer_ext("local_file", {"path": "C:/tmp/file.php"}) != ".php" diff --git a/tests/test_registry.py b/tests/test_registry.py index 146777b..e75d8b1 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -22,5 +22,6 @@ def test_register_builtin_plugins_registers_present_plugins(): plugins = r.list_plugins() assert "ckan" in plugins + assert "sdmx" in plugins assert "http_file" in plugins assert "local_file" in plugins diff --git a/tests/test_sdmx_plugin.py b/tests/test_sdmx_plugin.py new file mode 100644 index 0000000..a8184b4 --- /dev/null +++ b/tests/test_sdmx_plugin.py @@ -0,0 +1,154 @@ +from toolkit.core.exceptions import DownloadError +from toolkit.plugins.sdmx import SdmxSource + + +class _FakeResponse: + def __init__(self, status_code: int, text: str, url: str): + self.status_code = status_code + self.text = text + self.url = url + + +DATAFLOW_XML = """ + + + + + + + + + + + +""" + +DATA_JSON = """ +{ + "dataSets": [ + { + "series": { + "0:0:0:0:0:0": { + "observations": { + "0": [2634], + "1": [2621] + } + } + } + } + ], + "structure": { + "dimensions": { + "series": [ + {"id": "FREQ", "values": [{"id": "A", "name": "annual"}]}, + {"id": "REF_AREA", "values": [{"id": "001001", "name": "Agliè"}]}, + {"id": "DATA_TYPE", "values": [{"id": "JAN", "name": "population on 1st January"}]}, + {"id": "SEX", "values": [{"id": "9", "name": "total"}]}, + {"id": "AGE", "values": [{"id": "TOTAL", "name": "total"}]}, + {"id": "MARITAL_STATUS", "values": [{"id": "99", "name": "total"}]} + ], + "observation": [ + { + "id": "TIME_PERIOD", + "values": [ + {"id": "2024", "name": "2024"}, + {"id": "2025", "name": "2025"} + ] + } + ] + } + } +} +""" + +PREVIEW_JSON = """ +{ + "dataSets": [{"series": {}}], + "structure": { + "dimensions": { + "series": [ + {"id": "FREQ"}, + {"id": "REF_AREA"}, + {"id": "DATA_TYPE"}, + {"id": "SEX"}, + {"id": "AGE"}, + {"id": "MARITAL_STATUS"} + ] + } + } +} +""" + + +def test_sdmx_fetch_normalizes_csv(monkeypatch): + calls = [] + + def _fake_get(url, params=None, timeout=None, headers=None): + calls.append((url, params, headers.get("Accept") if headers else None)) + if url.endswith("/dataflow/IT1/22_289"): + return _FakeResponse(200, DATAFLOW_XML, url) + if url.endswith("/data/IT1,22_289,1.5/all"): + return _FakeResponse(200, PREVIEW_JSON, url) + if url.endswith("/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99"): + return _FakeResponse(200, DATA_JSON, url) + raise AssertionError(f"Unexpected URL {url}") + + monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get) + + payload, origin = SdmxSource().fetch( + "IT1", + "22_289", + "1.5", + { + "FREQ": "A", + "REF_AREA": "001001", + "DATA_TYPE": "JAN", + "SEX": "9", + "AGE": "TOTAL", + "MARITAL_STATUS": "99", + }, + ) + + text = payload.decode("utf-8") + assert origin.endswith("/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99") + assert "FREQ,FREQ_label" in text + assert "A,annual" in text + assert "001001,Agliè" in text + assert "2024,2024,2634" in text + assert any(call[2] == "application/json" for call in calls) + assert any(call[1] == {"firstNObservations": "0"} for call in calls) + + +def test_sdmx_fetch_blocks_version_mismatch(monkeypatch): + def _fake_get(url, params=None, timeout=None, headers=None): + if url.endswith("/dataflow/IT1/22_289"): + return _FakeResponse(200, DATAFLOW_XML, url) + raise AssertionError(f"Unexpected URL {url}") + + monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get) + + try: + SdmxSource().fetch("IT1", "22_289", "1.0", {"FREQ": "A"}) + except DownloadError as exc: + assert "current version is 1.5" in str(exc) + else: + raise AssertionError("Expected DownloadError") + + +def test_sdmx_fetch_rejects_unknown_filter_dimension(monkeypatch): + def _fake_get(url, params=None, timeout=None, headers=None): + if url.endswith("/dataflow/IT1/22_289"): + return _FakeResponse(200, DATAFLOW_XML, url) + if url.endswith("/data/IT1,22_289,1.5/all"): + return _FakeResponse(200, PREVIEW_JSON, url) + raise AssertionError(f"Unexpected URL {url}") + + monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get) + + try: + SdmxSource().fetch("IT1", "22_289", "1.5", {"TIME_PERIOD": "2024"}) + except DownloadError as exc: + assert "Unknown SDMX filter dimensions" in str(exc) + else: + raise AssertionError("Expected DownloadError") diff --git a/toolkit/core/registry.py b/toolkit/core/registry.py index 438868e..53d977c 100644 --- a/toolkit/core/registry.py +++ b/toolkit/core/registry.py @@ -19,13 +19,14 @@ def __init__(self): def register(self, name: str, factory: Callable[..., Any], *, overwrite: bool = False) -> None: if not overwrite and name in self._plugins: - raise ValueError(f"Plugin già registrato: '{name}'") + raise ValueError(f"Plugin già registrato: '{name}'") self._plugins[name] = factory def decorator(self, name: str, *, overwrite: bool = False): def _wrap(factory: Callable[..., Any]): self.register(name, factory, overwrite=overwrite) return factory + return _wrap def create(self, name: str, **kwargs): @@ -51,6 +52,13 @@ def clear(self) -> None: "optional": False, "factory": lambda cls: (lambda **client: cls(**client)), }, + { + "name": "sdmx", + "module": "toolkit.plugins.sdmx", + "class_name": "SdmxSource", + "optional": False, + "factory": lambda cls: (lambda **client: cls(**client)), + }, { "name": "http_file", "module": "toolkit.plugins.http_file", diff --git a/toolkit/plugins/__init__.py b/toolkit/plugins/__init__.py index c7accfb..cea154e 100644 --- a/toolkit/plugins/__init__.py +++ b/toolkit/plugins/__init__.py @@ -7,4 +7,5 @@ - `local_file` - `http_file` - `ckan` +- `sdmx` """ diff --git a/toolkit/plugins/sdmx.py b/toolkit/plugins/sdmx.py new file mode 100644 index 0000000..f9b4542 --- /dev/null +++ b/toolkit/plugins/sdmx.py @@ -0,0 +1,261 @@ +from __future__ import annotations + +import csv +import io +import json +import xml.etree.ElementTree as ET + +import requests + +from toolkit.core.exceptions import DownloadError + +SDMX_NS = { + "mes": "http://www.sdmx.org/resources/sdmxml/schemas/v2_1/message", + "str": "http://www.sdmx.org/resources/sdmxml/schemas/v2_1/structure", +} + + +def _safe_text(value: str | None) -> str: + return (value or "").strip() + + +def _normalize_base_url(url: str) -> str: + return url.rstrip("/") + + +def _flow_ref(agency: str, flow: str, version: str) -> str: + return f"{agency},{flow},{version}" + + +class SdmxSource: + """Fetch SDMX data as a normalized CSV payload.""" + + def __init__( + self, + timeout: int = 60, + retries: int = 2, + user_agent: str | None = None, + data_base_url: str | None = None, + metadata_base_url: str | None = None, + ): + self.timeout = timeout + self.retries = retries + self.user_agent = user_agent or "dataciviclab-toolkit/0.1" + self.data_base_url = _normalize_base_url( + data_base_url or "https://esploradati.istat.it/SDMXWS/rest" + ) + self.metadata_base_url = _normalize_base_url( + metadata_base_url or "https://sdmx.istat.it/SDMXWS/rest" + ) + + def _get_text( + self, + base_url: str, + path: str, + *, + accept: str | None = None, + params: dict[str, str] | None = None, + ) -> tuple[str, str]: + headers = {"User-Agent": self.user_agent} + if accept: + headers["Accept"] = accept + + url = f"{_normalize_base_url(base_url)}/{path.lstrip('/')}" + last_err: Exception | None = None + for _ in range(max(1, self.retries)): + try: + response = requests.get(url, params=params, timeout=self.timeout, headers=headers) + if response.status_code != 200: + raise DownloadError(f"HTTP {response.status_code} for {response.url}") + return response.text, response.url + except Exception as exc: + last_err = exc + raise DownloadError(str(last_err) if last_err else f"Failed to fetch {url}") + + def _get_json( + self, + base_url: str, + path: str, + *, + params: dict[str, str] | None = None, + ) -> tuple[dict, str]: + text, origin = self._get_text( + base_url, + path, + accept="application/json", + params=params, + ) + try: + return json.loads(text), origin + except json.JSONDecodeError as exc: + raise DownloadError(f"Invalid SDMX JSON payload from {origin}") from exc + + def _get_dataflow(self, agency: str, flow: str) -> ET.Element: + xml_text, _origin = self._get_text(self.metadata_base_url, f"dataflow/{agency}/{flow}") + try: + return ET.fromstring(xml_text) + except ET.ParseError as exc: + raise DownloadError(f"Invalid SDMX XML metadata for flow={flow}") from exc + + def _current_version(self, root: ET.Element) -> str: + dataflow = root.find(".//str:Dataflow", SDMX_NS) + if dataflow is None: + raise DownloadError("SDMX dataflow not found") + + structure_ref = dataflow.find(".//str:Structure/Ref", SDMX_NS) + if structure_ref is None: + raise DownloadError("SDMX dataflow missing Structure/Ref") + + version = _safe_text(structure_ref.attrib.get("version")) + return version + + def _preview_dimensions(self, agency: str, flow: str, version: str) -> list[str]: + flow_ref = _flow_ref(agency, flow, version) + payload, _origin = self._get_json( + self.data_base_url, + f"data/{flow_ref}/all", + params={"firstNObservations": "0"}, + ) + structure = payload.get("structure") or {} + dimensions = (structure.get("dimensions") or {}).get("series") or [] + result: list[str] = [] + for dim in dimensions: + dim_id = str(dim.get("id") or "") + if dim_id: + result.append(dim_id) + if not result: + raise DownloadError( + f"SDMX structure preview returned no series dimensions for {flow_ref}" + ) + return result + + def _build_key(self, dimensions: list[str], filters: dict | None) -> str: + filters = filters or {} + unknown = sorted(set(filters.keys()) - set(dimensions)) + if unknown: + raise DownloadError( + "Unknown SDMX filter dimensions: " + ", ".join(unknown) + ) + + if not dimensions: + return "all" + + parts: list[str] = [] + for dim in dimensions: + value = filters.get(dim) + if value is None: + parts.append("") + continue + if isinstance(value, (list, tuple)): + token = "+".join(str(item) for item in value) + else: + token = str(value) + parts.append(token) + + key = ".".join(parts).rstrip(".") + return key or "all" + + def _dimension_value(self, dimension: dict, index_token: str) -> tuple[str, str]: + values = dimension.get("values") or [] + idx = int(index_token) + if idx >= len(values): + raise DownloadError( + f"SDMX dimension index {idx} out of range for {dimension.get('id')}" + ) + entry = values[idx] + code = str(entry.get("id") or "") + label = str(entry.get("name") or code) + return code, label + + def _normalize_rows(self, payload: dict) -> tuple[list[str], list[dict[str, object]]]: + structure = payload.get("structure") or {} + dimensions = structure.get("dimensions") or {} + series_dims = dimensions.get("series") or [] + observation_dims = dimensions.get("observation") or [] + + header: list[str] = [] + for dim in series_dims: + dim_id = str(dim.get("id") or "") + if dim_id: + header.extend([dim_id, f"{dim_id}_label"]) + for dim in observation_dims: + dim_id = str(dim.get("id") or "") + if dim_id: + header.extend([dim_id, f"{dim_id}_label"]) + header.append("value") + + rows: list[dict[str, object]] = [] + for dataset in payload.get("dataSets") or []: + for series_key, series_val in (dataset.get("series") or {}).items(): + series_parts = series_key.split(":") if series_key else [] + series_ctx: dict[str, object] = {} + for idx, token in enumerate(series_parts): + if idx >= len(series_dims): + continue + dim = series_dims[idx] + dim_id = str(dim.get("id") or "") + code, label = self._dimension_value(dim, token) + series_ctx[dim_id] = code + series_ctx[f"{dim_id}_label"] = label + + for obs_key, obs_val in (series_val.get("observations") or {}).items(): + row = dict(series_ctx) + obs_parts = obs_key.split(":") if obs_key else [] + for idx, token in enumerate(obs_parts): + if idx >= len(observation_dims): + continue + dim = observation_dims[idx] + dim_id = str(dim.get("id") or "") + code, label = self._dimension_value(dim, token) + row[dim_id] = code + row[f"{dim_id}_label"] = label + + if isinstance(obs_val, list) and obs_val: + row["value"] = obs_val[0] + else: + row["value"] = obs_val + rows.append(row) + + return header, rows + + def _rows_to_csv(self, header: list[str], rows: list[dict[str, object]]) -> bytes: + buffer = io.StringIO(newline="") + writer = csv.DictWriter(buffer, fieldnames=header) + writer.writeheader() + for row in rows: + writer.writerow({col: row.get(col) for col in header}) + return buffer.getvalue().encode("utf-8") + + def fetch( + self, + agency: str, + flow: str, + version: str, + filters: dict | None = None, + ) -> tuple[bytes, str]: + agency = _safe_text(agency) or "IT1" + flow = _safe_text(flow) + version = _safe_text(version) + + if not flow: + raise DownloadError("SDMX source requires flow") + if not version: + raise DownloadError("SDMX source requires version") + + dataflow_root = self._get_dataflow(agency, flow) + current_version = self._current_version(dataflow_root) + + if current_version != version: + raise DownloadError( + f"Requested SDMX version {version} for {agency}/{flow} is not available; " + f"current version is {current_version}" + ) + + flow_ref = _flow_ref(agency, flow, version) + dimensions = self._preview_dimensions(agency, flow, version) + key = self._build_key(dimensions, filters) + payload, origin = self._get_json(self.data_base_url, f"data/{flow_ref}/{key}") + header, rows = self._normalize_rows(payload) + if not rows: + raise DownloadError(f"SDMX data returned no rows for {agency}/{flow} and key={key}") + return self._rows_to_csv(header, rows), origin diff --git a/toolkit/raw/run.py b/toolkit/raw/run.py index 8cebe22..7c9c224 100644 --- a/toolkit/raw/run.py +++ b/toolkit/raw/run.py @@ -25,6 +25,8 @@ def _format_args(args: dict, year: int) -> dict: def _infer_ext(stype: str, formatted_args: dict, origin: str | None = None) -> str: + if stype == "sdmx": + return ".csv" if stype in {"http_file", "ckan"}: url = origin or formatted_args.get("url", "") parsed = urlparse(url) @@ -75,6 +77,13 @@ def _fetch_payload(stype: str, client: dict, formatted_args: dict) -> tuple[byte str(formatted_args["resource_id"]) if formatted_args.get("resource_id") is not None else None, str(formatted_args["dataset_id"]) if formatted_args.get("dataset_id") is not None else None, ) + elif stype == "sdmx": + payload, origin = src.fetch( + str(formatted_args.get("agency") or "IT1"), + str(formatted_args["flow"]), + str(formatted_args["version"]), + formatted_args.get("filters"), + ) elif stype == "http_file": payload = src.fetch(formatted_args["url"]) origin = formatted_args["url"]