From 1c4e955b1a3074a4c9b0e0d47e78e149836b1c04 Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Sat, 28 Mar 2026 21:12:26 +0000 Subject: [PATCH 1/7] feat: add ckan source plugin --- docs/config-schema.md | 23 ++++++ tests/test_ckan_plugin.py | 95 +++++++++++++++++++++ tests/test_raw_ext_inference.py | 30 +++++++ tests/test_registry.py | 1 + toolkit/core/registry.py | 7 ++ toolkit/plugins/__init__.py | 1 + toolkit/plugins/ckan.py | 142 ++++++++++++++++++++++++++++++++ toolkit/raw/run.py | 16 ++-- 8 files changed, 310 insertions(+), 5 deletions(-) create mode 100644 tests/test_ckan_plugin.py create mode 100644 toolkit/plugins/ckan.py diff --git a/docs/config-schema.md b/docs/config-schema.md index 8787c00..63ddb91 100644 --- a/docs/config-schema.md +++ b/docs/config-schema.md @@ -57,6 +57,29 @@ I path relativi sono sempre risolti rispetto alla directory che contiene `datase `raw.sources[].args` e `raw.extractor.args` devono essere sempre oggetti YAML, non liste o stringhe. +Esempio `ckan`: + +```yaml +raw: + sources: + - name: bdap_lea + type: ckan + client: + timeout: 60 + retries: 2 + args: + portal_url: https://bdap-opendata.rgs.mef.gov.it/SpodCkanApi/api/3 + dataset_id: "d598ebd9-949d-4214-bb33-cd9c1be08f15" + resource_id: "33344" +``` + +Note pratiche per `ckan`: + +- il toolkit interroga `resource_show` prima del download +- se `resource_show` non e disponibile o non risolve il file, il toolkit ripiega su `package_show` +- se il portale restituisce un file URL in `http://`, il toolkit lo forza automaticamente a `https://` +- se `filename` non e dichiarato, il toolkit prova a inferire l'estensione dall'URL risolto + ## clean | Campo | Tipo | Default | diff --git a/tests/test_ckan_plugin.py b/tests/test_ckan_plugin.py new file mode 100644 index 0000000..9a76ff3 --- /dev/null +++ b/tests/test_ckan_plugin.py @@ -0,0 +1,95 @@ +from toolkit.core.exceptions import DownloadError +from toolkit.plugins.ckan import CkanSource + + +class _FakeResponse: + def __init__(self, status_code: int, *, json_data=None, content: bytes = b"", url: str = "https://example.org"): + self.status_code = status_code + self._json_data = json_data + self.content = content + self.url = url + + def json(self): + return self._json_data + + +def test_ckan_fetch_resource_show_forces_https(monkeypatch): + calls = [] + + def _fake_get(url, params=None, timeout=None, headers=None): + calls.append((url, params)) + if "resource_show" in url: + return _FakeResponse( + 200, + json_data={ + "success": True, + "result": {"url": "http://portal.example.org/export/data.csv"}, + }, + url=f"{url}?id=abc", + ) + return _FakeResponse( + 200, + content=b"a,b\n1,2\n", + url="https://portal.example.org/export/data.csv", + ) + + monkeypatch.setattr("toolkit.plugins.ckan.requests.get", _fake_get) + + payload, origin = CkanSource().fetch("https://portal.example.org/api/3", resource_id="abc") + + assert payload == b"a,b\n1,2\n" + assert origin == "https://portal.example.org/export/data.csv" + assert calls[1][0] == "https://portal.example.org/export/data.csv" + + +def test_ckan_fetch_falls_back_to_package_show(monkeypatch): + calls = [] + + def _fake_get(url, params=None, timeout=None, headers=None): + calls.append((url, params)) + if "resource_show" in url: + return _FakeResponse(404, json_data={}, url=f"{url}?id=33344") + if "package_show" in url: + return _FakeResponse( + 200, + json_data={ + "success": True, + "result": { + "resources": [ + { + "id": 33344, + "name": "csv dump", + "format": "CSV", + "url": "http://portal.example.org/api/3/datastore/dump/dataset.csv", + } + ] + }, + }, + url=f"{url}?id=dataset-id", + ) + return _FakeResponse( + 200, + content=b"a,b\n1,2\n", + url="https://portal.example.org/api/3/datastore/dump/dataset.csv", + ) + + monkeypatch.setattr("toolkit.plugins.ckan.requests.get", _fake_get) + + payload, origin = CkanSource().fetch( + "https://portal.example.org/api/3", + resource_id="33344", + dataset_id="dataset-id", + ) + + assert payload == b"a,b\n1,2\n" + assert origin == "https://portal.example.org/api/3/datastore/dump/dataset.csv" + assert any("package_show" in call[0] for call in calls) + + +def test_ckan_fetch_requires_identifier(): + try: + CkanSource().fetch("https://portal.example.org/api/3") + except DownloadError as exc: + assert "resource_id or dataset_id" in str(exc) + else: + raise AssertionError("Expected DownloadError") diff --git a/tests/test_raw_ext_inference.py b/tests/test_raw_ext_inference.py index 4126648..c6c1285 100644 --- a/tests/test_raw_ext_inference.py +++ b/tests/test_raw_ext_inference.py @@ -20,11 +20,41 @@ def test_infer_ext_http_csv_php_and_zip_php(): assert _infer_ext("http_file", {"url": "https://example.org/archive.zip.php"}) == ".zip" +def test_infer_ext_ckan_uses_resolved_origin(): + assert _infer_ext("ckan", {}, origin="https://example.org/dump/data.csv") == ".csv" + assert _infer_ext("ckan", {}, origin="https://example.org/archive.zip.php") == ".zip" + + def test_infer_ext_never_returns_php(): assert _infer_ext("http_file", {"url": "https://example.org/download.php?id=42"}) != ".php" assert _infer_ext("local_file", {"path": "C:/tmp/file.php"}) != ".php" +def test_run_raw_ckan_filename_inferred_from_resolved_url(monkeypatch, tmp_path: Path): + def _fake_fetch_payload(_stype: str, _client: dict, _formatted_args: dict): + return b"a,b\n1,2\n", "https://example.org/data.csv" + + monkeypatch.setattr("toolkit.raw.run._fetch_payload", _fake_fetch_payload) + + raw_cfg = { + "sources": [ + { + "name": "bdap_resource", + "type": "ckan", + "args": { + "portal_url": "https://portal.example.org/SpodCkanApi/api/3", + "resource_id": "33344", + }, + } + ] + } + + run_raw("demo", 2024, str(tmp_path), raw_cfg, _NoopLogger()) + + out_dir = tmp_path / "data" / "raw" / "demo" / "2024" + assert (out_dir / "bdap_resource.csv").exists() + + def test_run_raw_filename_override_has_priority(monkeypatch, tmp_path: Path): def _fake_fetch_payload(_stype: str, _client: dict, _formatted_args: dict): return b"a,b\n1,2\n", "https://example.org/dataset.csv.php" diff --git a/tests/test_registry.py b/tests/test_registry.py index 757e230..146777b 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -21,5 +21,6 @@ def test_register_builtin_plugins_registers_present_plugins(): register_builtin_plugins(registry_obj=r) plugins = r.list_plugins() + assert "ckan" in plugins assert "http_file" in plugins assert "local_file" in plugins diff --git a/toolkit/core/registry.py b/toolkit/core/registry.py index d8bcad5..438868e 100644 --- a/toolkit/core/registry.py +++ b/toolkit/core/registry.py @@ -44,6 +44,13 @@ def clear(self) -> None: registry = Registry() _BUILTIN_PLUGINS: tuple[dict[str, Any], ...] = ( + { + "name": "ckan", + "module": "toolkit.plugins.ckan", + "class_name": "CkanSource", + "optional": False, + "factory": lambda cls: (lambda **client: cls(**client)), + }, { "name": "http_file", "module": "toolkit.plugins.http_file", diff --git a/toolkit/plugins/__init__.py b/toolkit/plugins/__init__.py index f6fc45e..c7accfb 100644 --- a/toolkit/plugins/__init__.py +++ b/toolkit/plugins/__init__.py @@ -6,4 +6,5 @@ - `local_file` - `http_file` +- `ckan` """ diff --git a/toolkit/plugins/ckan.py b/toolkit/plugins/ckan.py new file mode 100644 index 0000000..a89fdb7 --- /dev/null +++ b/toolkit/plugins/ckan.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +from urllib.parse import urlparse, urlunparse + +import requests + +from toolkit.core.exceptions import DownloadError + + +def _normalize_resource_show_url(portal_url: str) -> str: + base = portal_url.rstrip("/") + if base.endswith("/api/3/action"): + return f"{base}/resource_show" + if base.endswith("/api/3"): + return f"{base}/action/resource_show" + return f"{base}/api/3/action/resource_show" + + +def _normalize_package_show_url(portal_url: str) -> str: + base = portal_url.rstrip("/") + if base.endswith("/api/3/action"): + return f"{base}/package_show" + if base.endswith("/api/3"): + return f"{base}/action/package_show" + return f"{base}/api/3/action/package_show" + + +def _force_https(url: str) -> str: + parsed = urlparse(url) + if parsed.scheme == "http": + return urlunparse(parsed._replace(scheme="https")) + return url + + +class CkanSource: + """Resolve a CKAN resource via resource_show, then download its current URL.""" + + def __init__(self, timeout: int = 60, retries: int = 2, user_agent: str | None = None): + self.timeout = timeout + self.retries = retries + self.user_agent = user_agent or "dataciviclab-toolkit/0.1" + + def _get_json(self, url: str, params: dict[str, str]) -> dict: + headers = {"User-Agent": self.user_agent} + last_err: Exception | None = None + for _ in range(max(1, self.retries)): + try: + response = requests.get(url, params=params, timeout=self.timeout, headers=headers) + if response.status_code != 200: + raise DownloadError(f"HTTP {response.status_code} for {response.url}") + data = response.json() + if not data.get("success"): + raise DownloadError(f"CKAN API failed for {response.url}") + return data + except Exception as exc: + last_err = exc + raise DownloadError(str(last_err) if last_err else f"Failed to fetch CKAN metadata from {url}") + + def _download_bytes(self, url: str) -> bytes: + headers = {"User-Agent": self.user_agent} + last_err: Exception | None = None + for _ in range(max(1, self.retries)): + try: + response = requests.get(url, timeout=self.timeout, headers=headers) + if response.status_code != 200: + raise DownloadError(f"HTTP {response.status_code} for {url}") + return response.content + except Exception as exc: + last_err = exc + raise DownloadError(str(last_err) if last_err else f"Failed to fetch {url}") + + def _select_resource_from_package(self, result: dict, resource_id: str | None) -> dict: + resources = result.get("resources") or [] + if not resources: + raise DownloadError("CKAN package_show returned no resources") + + if resource_id: + for item in resources: + if str(item.get("id")) == str(resource_id): + return item + + with_url = [item for item in resources if item.get("url")] + if not with_url: + raise DownloadError("CKAN package_show returned resources without URL") + + def _score(item: dict) -> tuple[int, str]: + fmt = str(item.get("format") or "").lower() + url = str(item.get("url") or "").lower() + if "csv" in fmt or ".csv" in url: + rank = 0 + elif "zip" in fmt or ".zip" in url: + rank = 1 + elif "xlsx" in fmt or ".xlsx" in url or "xls" in fmt or ".xls" in url: + rank = 2 + elif "json" in fmt or ".json" in url: + rank = 3 + elif "xml" in fmt or ".xml" in url: + rank = 4 + else: + rank = 9 + return rank, str(item.get("name") or "") + + return sorted(with_url, key=_score)[0] + + def fetch( + self, + portal_url: str, + resource_id: str | None = None, + dataset_id: str | None = None, + ) -> tuple[bytes, str]: + last_err: Exception | None = None + + if resource_id: + api_url = _normalize_resource_show_url(portal_url) + try: + metadata = self._get_json(api_url, {"id": str(resource_id)}) + result = metadata.get("result") or {} + raw_url = result.get("url") + if raw_url: + resolved_url = _force_https(str(raw_url)) + return self._download_bytes(resolved_url), resolved_url + last_err = DownloadError( + f"CKAN resource_show returned no URL for resource_id={resource_id}" + ) + except Exception as exc: + last_err = exc + + package_identifier = dataset_id or resource_id + if package_identifier: + api_url = _normalize_package_show_url(portal_url) + try: + metadata = self._get_json(api_url, {"id": str(package_identifier)}) + result = metadata.get("result") or {} + resource = self._select_resource_from_package(result, resource_id) + resolved_url = _force_https(str(resource["url"])) + return self._download_bytes(resolved_url), resolved_url + except Exception as exc: + last_err = exc + + if last_err: + raise DownloadError(str(last_err)) + raise DownloadError("CKAN source requires resource_id or dataset_id") diff --git a/toolkit/raw/run.py b/toolkit/raw/run.py index 7ae8b2e..8cebe22 100644 --- a/toolkit/raw/run.py +++ b/toolkit/raw/run.py @@ -24,9 +24,9 @@ def _format_args(args: dict, year: int) -> dict: return formatted -def _infer_ext(stype: str, formatted_args: dict) -> str: - if stype == "http_file": - url = formatted_args.get("url", "") +def _infer_ext(stype: str, formatted_args: dict, origin: str | None = None) -> str: + if stype in {"http_file", "ckan"}: + url = origin or formatted_args.get("url", "") parsed = urlparse(url) path = parsed.path or "" low_path = path.lower() @@ -69,7 +69,13 @@ def _infer_ext(stype: str, formatted_args: dict) -> str: def _fetch_payload(stype: str, client: dict, formatted_args: dict) -> tuple[bytes, str]: src = registry.create(stype, **(client or {})) - if stype == "http_file": + if stype == "ckan": + payload, origin = src.fetch( + formatted_args["portal_url"], + str(formatted_args["resource_id"]) if formatted_args.get("resource_id") is not None else None, + str(formatted_args["dataset_id"]) if formatted_args.get("dataset_id") is not None else None, + ) + elif stype == "http_file": payload = src.fetch(formatted_args["url"]) origin = formatted_args["url"] elif stype == "local_file": @@ -210,7 +216,7 @@ def run_raw( if explicit: extracted = {explicit: payload} else: - ext = _infer_ext(stype, formatted_args) + ext = _infer_ext(stype, formatted_args, origin=origin) extracted = {f"{name}{ext}": payload} # scrittura file From 5482f3813659679464c7ead03eec5286ce16cfbe Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Sat, 28 Mar 2026 21:21:03 +0000 Subject: [PATCH 2/7] docs: add ckan smoke for bdap --- smoke/README.md | 1 + smoke/bdap_ckan_csv/README.md | 28 +++++++++++++++++++++++ smoke/bdap_ckan_csv/dataset.yml | 29 ++++++++++++++++++++++++ smoke/bdap_ckan_csv/sql/clean.sql | 16 +++++++++++++ smoke/bdap_ckan_csv/sql/mart/mart_ok.sql | 10 ++++++++ 5 files changed, 84 insertions(+) create mode 100644 smoke/bdap_ckan_csv/README.md create mode 100644 smoke/bdap_ckan_csv/dataset.yml create mode 100644 smoke/bdap_ckan_csv/sql/clean.sql create mode 100644 smoke/bdap_ckan_csv/sql/mart/mart_ok.sql diff --git a/smoke/README.md b/smoke/README.md index 0f6d35d..b64df55 100644 --- a/smoke/README.md +++ b/smoke/README.md @@ -20,6 +20,7 @@ Progetti inclusi: - `smoke/local_file_csv`: `local_file` completamente offline - `smoke/zip_http_csv`: `http_file` + extractor ZIP (`unzip_first_csv`) contro server locale - `smoke/bdap_http_csv`: `http_file` contro CSV pubblico BDAP +- `smoke/bdap_ckan_csv`: `ckan` contro OpenBDAP, con fallback `package_show` e force `https` - `smoke/finanze_http_zip_2023`: `http_file` contro ZIP pubblico reale, best-effort Ogni progetto include: diff --git a/smoke/bdap_ckan_csv/README.md b/smoke/bdap_ckan_csv/README.md new file mode 100644 index 0000000..ac82f18 --- /dev/null +++ b/smoke/bdap_ckan_csv/README.md @@ -0,0 +1,28 @@ +# bdap_ckan_csv + +Smoke manuale per `ckan` contro OpenBDAP, con fallback `resource_show -> package_show` e force `https`. + +## Comandi + +```bash +toolkit run raw --config dataset.yml +toolkit profile raw --config dataset.yml +toolkit run clean --config dataset.yml +toolkit run mart --config dataset.yml +toolkit status --dataset bdap_ckan_csv --year 2022 --latest --config dataset.yml +``` + +## Verifiche attese + +- `./_smoke_out/data/raw/bdap_ckan_csv/2022/manifest.json` +- `./_smoke_out/data/raw/bdap_ckan_csv/2022/raw_validation.json` +- `./_smoke_out/data/raw/bdap_ckan_csv/2022/_profile/raw_profile.json` +- `./_smoke_out/data/raw/bdap_ckan_csv/2022/_profile/suggested_read.yml` +- `./_smoke_out/data/clean/bdap_ckan_csv/2022/metadata.json` con `read_params_source`, `read_source_used`, `read_params_used` +- `./_smoke_out/data/mart/bdap_ckan_csv/2022/mart_ok.parquet` + +## Note + +- questo smoke usa un dataset OpenBDAP reale +- il portale espone `package_show`, ma `resource_show` non risolve: il caso serve proprio a verificare il fallback del plugin +- l'URL file restituito dal portale puo' arrivare in `http://`: il toolkit lo forza a `https://` prima del download diff --git a/smoke/bdap_ckan_csv/dataset.yml b/smoke/bdap_ckan_csv/dataset.yml new file mode 100644 index 0000000..9af0c3c --- /dev/null +++ b/smoke/bdap_ckan_csv/dataset.yml @@ -0,0 +1,29 @@ +root: "./_smoke_out" + +dataset: + name: "bdap_ckan_csv" + years: [2022] + +raw: + sources: + - type: "ckan" + args: + portal_url: "https://bdap-opendata.rgs.mef.gov.it/SpodCkanApi/api/3" + dataset_id: "d598ebd9-949d-4214-bb33-cd9c1be08f15" + resource_id: "33344" + filename: "bdap_lea_2024.csv" + +clean: + sql: "sql/clean.sql" + read: + delim: ";" + decimal: "." + encoding: "utf-8" + header: true + columns: null + validate: {} + +mart: + tables: + - name: "mart_ok" + sql: "sql/mart/mart_ok.sql" diff --git a/smoke/bdap_ckan_csv/sql/clean.sql b/smoke/bdap_ckan_csv/sql/clean.sql new file mode 100644 index 0000000..c31afca --- /dev/null +++ b/smoke/bdap_ckan_csv/sql/clean.sql @@ -0,0 +1,16 @@ +WITH base AS ( + SELECT + TRY_CAST(TRIM(CAST("Anno di Riferimento" AS VARCHAR)) AS INTEGER) AS anno, + TRY_CAST(TRIM(CAST("Codice Regione" AS VARCHAR)) AS INTEGER) AS codice_regione, + TRIM(CAST("Descrizione Regione" AS VARCHAR)) AS regione, + TRY_CAST(TRIM(CAST("Codice Ente SSN" AS VARCHAR)) AS INTEGER) AS codice_ente_ssn, + TRIM(CAST("Descrizione Ente" AS VARCHAR)) AS descrizione_ente, + TRIM(CAST("Codice Voce Contabile" AS VARCHAR)) AS codice_voce_contabile, + TRIM(CAST("Descrizione Voce Contabile" AS VARCHAR)) AS descrizione_voce_contabile, + TRY_CAST(TRIM(CAST("Importo Totale" AS VARCHAR)) AS DOUBLE) AS importo_totale + FROM raw_input +) + +SELECT * +FROM base +WHERE anno IS NOT NULL; diff --git a/smoke/bdap_ckan_csv/sql/mart/mart_ok.sql b/smoke/bdap_ckan_csv/sql/mart/mart_ok.sql new file mode 100644 index 0000000..32f2b31 --- /dev/null +++ b/smoke/bdap_ckan_csv/sql/mart/mart_ok.sql @@ -0,0 +1,10 @@ +SELECT + anno, + regione, + descrizione_voce_contabile, + COUNT(*) AS righe, + SUM(importo_totale) AS importo_totale +FROM clean_input +WHERE anno IS NOT NULL +GROUP BY 1, 2, 3 +ORDER BY 1, 2, 3 From 6b6bb2fd65212f823a533d91c78e6332339cfdf9 Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Sat, 28 Mar 2026 21:23:07 +0000 Subject: [PATCH 3/7] chore: remove redundant ispra smoke --- smoke/README.md | 1 - smoke/ispra_http_csv/README.md | 35 ------------------- smoke/ispra_http_csv/dataset.yml | 31 ---------------- .../fixtures/ispra_http_sample.csv | 3 -- smoke/ispra_http_csv/sql/clean.sql | 1 - smoke/ispra_http_csv/sql/mart/mart_ok.sql | 1 - 6 files changed, 72 deletions(-) delete mode 100644 smoke/ispra_http_csv/README.md delete mode 100644 smoke/ispra_http_csv/dataset.yml delete mode 100644 smoke/ispra_http_csv/fixtures/ispra_http_sample.csv delete mode 100644 smoke/ispra_http_csv/sql/clean.sql delete mode 100644 smoke/ispra_http_csv/sql/mart/mart_ok.sql diff --git a/smoke/README.md b/smoke/README.md index b64df55..ccdc7ce 100644 --- a/smoke/README.md +++ b/smoke/README.md @@ -16,7 +16,6 @@ Di conseguenza: Progetti inclusi: -- `smoke/ispra_http_csv`: `http_file` contro server locale `http.server` - `smoke/local_file_csv`: `local_file` completamente offline - `smoke/zip_http_csv`: `http_file` + extractor ZIP (`unzip_first_csv`) contro server locale - `smoke/bdap_http_csv`: `http_file` contro CSV pubblico BDAP diff --git a/smoke/ispra_http_csv/README.md b/smoke/ispra_http_csv/README.md deleted file mode 100644 index c3522e4..0000000 --- a/smoke/ispra_http_csv/README.md +++ /dev/null @@ -1,35 +0,0 @@ -# ispra_http_csv - -Smoke manuale per `http_file` con CSV servito localmente. - -## Setup - -Avvia un server HTTP dalla directory di questo progetto: - -```bash -py -m http.server 8000 -``` - -In un secondo terminale: - -```bash -toolkit run raw --config dataset.yml -toolkit profile raw --config dataset.yml -toolkit run clean --config dataset.yml -toolkit run mart --config dataset.yml -toolkit status --dataset ispra_http_csv --year 2022 --latest --config dataset.yml -``` - -## Verifiche attese - -- `./_smoke_out/data/raw/ispra_http_csv/2022/manifest.json` -- `./_smoke_out/data/raw/ispra_http_csv/2022/raw_validation.json` -- `./_smoke_out/data/raw/ispra_http_csv/2022/_profile/raw_profile.json` -- `./_smoke_out/data/raw/ispra_http_csv/2022/_profile/suggested_read.yml` -- `./_smoke_out/data/clean/ispra_http_csv/2022/metadata.json` con `read_params_source`, `read_source_used`, `read_params_used` -- `./_smoke_out/data/mart/ispra_http_csv/2022/mart_ok.parquet` - -Controlli contratto: - -- RAW `manifest.json` contiene `primary_output_file` -- CLEAN usa il manifest RAW e scrive `metadata.json` con audit fields di lettura diff --git a/smoke/ispra_http_csv/dataset.yml b/smoke/ispra_http_csv/dataset.yml deleted file mode 100644 index e62568d..0000000 --- a/smoke/ispra_http_csv/dataset.yml +++ /dev/null @@ -1,31 +0,0 @@ -root: "./_smoke_out" - -dataset: - name: "ispra_http_csv" - years: [2022] - -raw: - sources: - - type: "http_file" - args: - url: "https://www.catasto-rifiuti.isprambiente.it/get/getDettaglioComunale.csv.php?&aa={year}" - filename: "ispra_http_sample_{year}.csv" - -clean: - sql: "sql/clean.sql" - read: - encoding: "utf-8" - header: true - decimal: "," - skip: 1 - delim: ";" - null_padding: true - ignore_errors: true - auto_detect: false - strict_mode: false - columns: null - -mart: - tables: - - name: "mart_ok" - sql: "sql/mart/mart_ok.sql" diff --git a/smoke/ispra_http_csv/fixtures/ispra_http_sample.csv b/smoke/ispra_http_csv/fixtures/ispra_http_sample.csv deleted file mode 100644 index 24cf4f5..0000000 --- a/smoke/ispra_http_csv/fixtures/ispra_http_sample.csv +++ /dev/null @@ -1,3 +0,0 @@ -anno,comune,provincia,regione,valore -2022,Agliè,Torino,Piemonte,1 -2022,Ivrea,Torino,Piemonte,2 diff --git a/smoke/ispra_http_csv/sql/clean.sql b/smoke/ispra_http_csv/sql/clean.sql deleted file mode 100644 index 70e1ce8..0000000 --- a/smoke/ispra_http_csv/sql/clean.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT 1 AS ok diff --git a/smoke/ispra_http_csv/sql/mart/mart_ok.sql b/smoke/ispra_http_csv/sql/mart/mart_ok.sql deleted file mode 100644 index 70e1ce8..0000000 --- a/smoke/ispra_http_csv/sql/mart/mart_ok.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT 1 AS ok From 93ac41c18b835b1e3baa668e82112a1a4448290c Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Sat, 28 Mar 2026 22:57:03 +0000 Subject: [PATCH 4/7] feat: add sdmx source plugin --- docs/config-schema.md | 30 +++ smoke/README.md | 2 + smoke/istat_sdmx_22_289/README.md | 24 ++ smoke/istat_sdmx_22_289/dataset.yml | 37 +++ smoke/istat_sdmx_22_289/sql/clean.sql | 7 + smoke/istat_sdmx_22_289/sql/mart/mart_ok.sql | 6 + tests/test_raw_ext_inference.py | 4 + tests/test_registry.py | 1 + tests/test_sdmx_plugin.py | 154 +++++++++++ toolkit/core/registry.py | 7 + toolkit/plugins/__init__.py | 1 + toolkit/plugins/sdmx.py | 253 +++++++++++++++++++ toolkit/raw/run.py | 10 + 13 files changed, 536 insertions(+) create mode 100644 smoke/istat_sdmx_22_289/README.md create mode 100644 smoke/istat_sdmx_22_289/dataset.yml create mode 100644 smoke/istat_sdmx_22_289/sql/clean.sql create mode 100644 smoke/istat_sdmx_22_289/sql/mart/mart_ok.sql create mode 100644 tests/test_sdmx_plugin.py create mode 100644 toolkit/plugins/sdmx.py diff --git a/docs/config-schema.md b/docs/config-schema.md index 63ddb91..2cdee89 100644 --- a/docs/config-schema.md +++ b/docs/config-schema.md @@ -80,6 +80,36 @@ Note pratiche per `ckan`: - se il portale restituisce un file URL in `http://`, il toolkit lo forza automaticamente a `https://` - se `filename` non e dichiarato, il toolkit prova a inferire l'estensione dall'URL risolto +Esempio `sdmx`: + +```yaml +raw: + sources: + - name: popolazione_residente + type: sdmx + client: + timeout: 60 + retries: 2 + args: + agency: IT1 + flow: 22_289 + version: "1.5" + filters: + FREQ: A + REF_AREA: "001001" + DATA_TYPE: JAN + SEX: "9" + AGE: TOTAL + MARITAL_STATUS: "99" +``` + +Note pratiche per `sdmx`: + +- la `version` e' obbligatoria e deve coincidere con la versione corrente esposta dal dataflow +- non esiste fallback silenzioso a `latest` +- in v1 i `filters` sono supportati solo sulle dimensioni di serie, non su `TIME_PERIOD` +- il plugin restituisce un CSV normalizzato con colonne `DIM`, `DIM_label` e `value` + ## clean | Campo | Tipo | Default | diff --git a/smoke/README.md b/smoke/README.md index ccdc7ce..df89bf6 100644 --- a/smoke/README.md +++ b/smoke/README.md @@ -16,10 +16,12 @@ Di conseguenza: Progetti inclusi: +- `smoke/ispra_http_csv`: `http_file` contro server locale `http.server` - `smoke/local_file_csv`: `local_file` completamente offline - `smoke/zip_http_csv`: `http_file` + extractor ZIP (`unzip_first_csv`) contro server locale - `smoke/bdap_http_csv`: `http_file` contro CSV pubblico BDAP - `smoke/bdap_ckan_csv`: `ckan` contro OpenBDAP, con fallback `package_show` e force `https` +- `smoke/istat_sdmx_22_289`: `sdmx` contro flow ISTAT reale `22_289` - `smoke/finanze_http_zip_2023`: `http_file` contro ZIP pubblico reale, best-effort Ogni progetto include: diff --git a/smoke/istat_sdmx_22_289/README.md b/smoke/istat_sdmx_22_289/README.md new file mode 100644 index 0000000..3cc9e9a --- /dev/null +++ b/smoke/istat_sdmx_22_289/README.md @@ -0,0 +1,24 @@ +# istat_sdmx_22_289 + +Smoke manuale per `sdmx` su dataflow ISTAT `22_289`. + +## Comandi + +```bash +toolkit run raw --config dataset.yml +toolkit profile raw --config dataset.yml +toolkit run clean --config dataset.yml +toolkit run mart --config dataset.yml +toolkit status --dataset istat_sdmx_22_289 --year 2024 --latest --config dataset.yml +``` + +## Verifiche attese + +- `./_smoke_out/data/raw/istat_sdmx_22_289/2024/manifest.json` +- `./_smoke_out/data/raw/istat_sdmx_22_289/2024/raw_validation.json` +- `./_smoke_out/data/clean/istat_sdmx_22_289/2024/metadata.json` +- `./_smoke_out/data/mart/istat_sdmx_22_289/2024/mart_ok.parquet` + +## Nota + +Questo smoke usa un flow ISTAT reale e richiede che la versione `1.5` sia ancora quella esposta dal dataflow `22_289`. diff --git a/smoke/istat_sdmx_22_289/dataset.yml b/smoke/istat_sdmx_22_289/dataset.yml new file mode 100644 index 0000000..4a79a80 --- /dev/null +++ b/smoke/istat_sdmx_22_289/dataset.yml @@ -0,0 +1,37 @@ +root: "./_smoke_out" + +dataset: + name: "istat_sdmx_22_289" + years: [2024] + +raw: + sources: + - name: "popolazione_residente" + type: "sdmx" + args: + agency: "IT1" + flow: "22_289" + version: "1.5" + filters: + FREQ: "A" + REF_AREA: "001001" + DATA_TYPE: "JAN" + SEX: "9" + AGE: "TOTAL" + MARITAL_STATUS: "99" + filename: "istat_popolazione_residente_22_289.csv" + +clean: + sql: "sql/clean.sql" + read_mode: "strict" + read: + mode: "explicit" + delim: "," + encoding: "utf-8" + header: true + validate: {} + +mart: + tables: + - name: "mart_ok" + sql: "sql/mart/mart_ok.sql" diff --git a/smoke/istat_sdmx_22_289/sql/clean.sql b/smoke/istat_sdmx_22_289/sql/clean.sql new file mode 100644 index 0000000..19434cb --- /dev/null +++ b/smoke/istat_sdmx_22_289/sql/clean.sql @@ -0,0 +1,7 @@ +select + TIME_PERIOD, + REF_AREA, + REF_AREA_label, + try_cast(value as bigint) as residenti +from raw_input +where TIME_PERIOD = '2024' diff --git a/smoke/istat_sdmx_22_289/sql/mart/mart_ok.sql b/smoke/istat_sdmx_22_289/sql/mart/mart_ok.sql new file mode 100644 index 0000000..f8c1c17 --- /dev/null +++ b/smoke/istat_sdmx_22_289/sql/mart/mart_ok.sql @@ -0,0 +1,6 @@ +select + TIME_PERIOD, + REF_AREA, + REF_AREA_label, + residenti +from clean_input diff --git a/tests/test_raw_ext_inference.py b/tests/test_raw_ext_inference.py index c6c1285..3f47021 100644 --- a/tests/test_raw_ext_inference.py +++ b/tests/test_raw_ext_inference.py @@ -25,6 +25,10 @@ def test_infer_ext_ckan_uses_resolved_origin(): assert _infer_ext("ckan", {}, origin="https://example.org/archive.zip.php") == ".zip" +def test_infer_ext_sdmx_is_csv(): + assert _infer_ext("sdmx", {"flow": "22_289"}) == ".csv" + + def test_infer_ext_never_returns_php(): assert _infer_ext("http_file", {"url": "https://example.org/download.php?id=42"}) != ".php" assert _infer_ext("local_file", {"path": "C:/tmp/file.php"}) != ".php" diff --git a/tests/test_registry.py b/tests/test_registry.py index 146777b..e75d8b1 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -22,5 +22,6 @@ def test_register_builtin_plugins_registers_present_plugins(): plugins = r.list_plugins() assert "ckan" in plugins + assert "sdmx" in plugins assert "http_file" in plugins assert "local_file" in plugins diff --git a/tests/test_sdmx_plugin.py b/tests/test_sdmx_plugin.py new file mode 100644 index 0000000..e9be7c2 --- /dev/null +++ b/tests/test_sdmx_plugin.py @@ -0,0 +1,154 @@ +from toolkit.core.exceptions import DownloadError +from toolkit.plugins.sdmx import SdmxSource + + +class _FakeResponse: + def __init__(self, status_code: int, text: str, url: str): + self.status_code = status_code + self.text = text + self.url = url + + +DATAFLOW_XML = """ + + + + + + + + + + + +""" + +DATA_JSON = """ +{ + "dataSets": [ + { + "series": { + "0:0:0:0:0:0": { + "observations": { + "0": [2634], + "1": [2621] + } + } + } + } + ], + "structure": { + "dimensions": { + "series": [ + {"id": "FREQ", "values": [{"id": "A", "name": "annual"}]}, + {"id": "REF_AREA", "values": [{"id": "001001", "name": "Agliè"}]}, + {"id": "DATA_TYPE", "values": [{"id": "JAN", "name": "population on 1st January"}]}, + {"id": "SEX", "values": [{"id": "9", "name": "total"}]}, + {"id": "AGE", "values": [{"id": "TOTAL", "name": "total"}]}, + {"id": "MARITAL_STATUS", "values": [{"id": "99", "name": "total"}]} + ], + "observation": [ + { + "id": "TIME_PERIOD", + "values": [ + {"id": "2024", "name": "2024"}, + {"id": "2025", "name": "2025"} + ] + } + ] + } + } +} +""" + +PREVIEW_JSON = """ +{ + "dataSets": [{"series": {}}], + "structure": { + "dimensions": { + "series": [ + {"id": "FREQ"}, + {"id": "REF_AREA"}, + {"id": "DATA_TYPE"}, + {"id": "SEX"}, + {"id": "AGE"}, + {"id": "MARITAL_STATUS"} + ] + } + } +} +""" + + +def test_sdmx_fetch_normalizes_csv(monkeypatch): + calls = [] + + def _fake_get(url, params=None, timeout=None, headers=None): + calls.append((url, params, headers.get("Accept") if headers else None)) + if url.endswith("/dataflow/IT1/22_289"): + return _FakeResponse(200, DATAFLOW_XML, url) + if url.endswith("/data/22_289/all"): + return _FakeResponse(200, PREVIEW_JSON, url) + if url.endswith("/data/22_289/A.001001.JAN.9.TOTAL.99"): + return _FakeResponse(200, DATA_JSON, url) + raise AssertionError(f"Unexpected URL {url}") + + monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get) + + payload, origin = SdmxSource().fetch( + "IT1", + "22_289", + "1.5", + { + "FREQ": "A", + "REF_AREA": "001001", + "DATA_TYPE": "JAN", + "SEX": "9", + "AGE": "TOTAL", + "MARITAL_STATUS": "99", + }, + ) + + text = payload.decode("utf-8") + assert origin.endswith("/data/22_289/A.001001.JAN.9.TOTAL.99") + assert "FREQ,FREQ_label" in text + assert "A,annual" in text + assert "001001,Agliè" in text + assert "2024,2024,2634" in text + assert any(call[2] == "application/json" for call in calls) + assert any(call[1] == {"firstNObservations": "0"} for call in calls) + + +def test_sdmx_fetch_blocks_version_mismatch(monkeypatch): + def _fake_get(url, params=None, timeout=None, headers=None): + if url.endswith("/dataflow/IT1/22_289"): + return _FakeResponse(200, DATAFLOW_XML, url) + raise AssertionError(f"Unexpected URL {url}") + + monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get) + + try: + SdmxSource().fetch("IT1", "22_289", "1.0", {"FREQ": "A"}) + except DownloadError as exc: + assert "current version is 1.5" in str(exc) + else: + raise AssertionError("Expected DownloadError") + + +def test_sdmx_fetch_rejects_unknown_filter_dimension(monkeypatch): + def _fake_get(url, params=None, timeout=None, headers=None): + if url.endswith("/dataflow/IT1/22_289"): + return _FakeResponse(200, DATAFLOW_XML, url) + if url.endswith("/data/22_289/all"): + return _FakeResponse(200, PREVIEW_JSON, url) + raise AssertionError(f"Unexpected URL {url}") + + monkeypatch.setattr("toolkit.plugins.sdmx.requests.get", _fake_get) + + try: + SdmxSource().fetch("IT1", "22_289", "1.5", {"TIME_PERIOD": "2024"}) + except DownloadError as exc: + assert "Unknown SDMX filter dimensions" in str(exc) + else: + raise AssertionError("Expected DownloadError") diff --git a/toolkit/core/registry.py b/toolkit/core/registry.py index 438868e..a37238d 100644 --- a/toolkit/core/registry.py +++ b/toolkit/core/registry.py @@ -51,6 +51,13 @@ def clear(self) -> None: "optional": False, "factory": lambda cls: (lambda **client: cls(**client)), }, + { + "name": "sdmx", + "module": "toolkit.plugins.sdmx", + "class_name": "SdmxSource", + "optional": False, + "factory": lambda cls: (lambda **client: cls(**client)), + }, { "name": "http_file", "module": "toolkit.plugins.http_file", diff --git a/toolkit/plugins/__init__.py b/toolkit/plugins/__init__.py index c7accfb..cea154e 100644 --- a/toolkit/plugins/__init__.py +++ b/toolkit/plugins/__init__.py @@ -7,4 +7,5 @@ - `local_file` - `http_file` - `ckan` +- `sdmx` """ diff --git a/toolkit/plugins/sdmx.py b/toolkit/plugins/sdmx.py new file mode 100644 index 0000000..8469f54 --- /dev/null +++ b/toolkit/plugins/sdmx.py @@ -0,0 +1,253 @@ +from __future__ import annotations + +import csv +import io +import json +import xml.etree.ElementTree as ET + +import requests + +from toolkit.core.exceptions import DownloadError + +SDMX_NS = { + "mes": "http://www.sdmx.org/resources/sdmxml/schemas/v2_1/message", + "str": "http://www.sdmx.org/resources/sdmxml/schemas/v2_1/structure", +} + + +def _safe_text(value: str | None) -> str: + return (value or "").strip() + + +def _normalize_base_url(url: str) -> str: + return url.rstrip("/") + + +class SdmxSource: + """Fetch SDMX data as a normalized CSV payload.""" + + def __init__( + self, + timeout: int = 60, + retries: int = 2, + user_agent: str | None = None, + data_base_url: str | None = None, + metadata_base_url: str | None = None, + ): + self.timeout = timeout + self.retries = retries + self.user_agent = user_agent or "dataciviclab-toolkit/0.1" + self.data_base_url = _normalize_base_url( + data_base_url or "https://esploradati.istat.it/SDMXWS/rest" + ) + self.metadata_base_url = _normalize_base_url( + metadata_base_url or "https://sdmx.istat.it/SDMXWS/rest" + ) + + def _get_text( + self, + base_url: str, + path: str, + *, + accept: str | None = None, + params: dict[str, str] | None = None, + ) -> tuple[str, str]: + headers = {"User-Agent": self.user_agent} + if accept: + headers["Accept"] = accept + + url = f"{_normalize_base_url(base_url)}/{path.lstrip('/')}" + last_err: Exception | None = None + for _ in range(max(1, self.retries)): + try: + response = requests.get(url, params=params, timeout=self.timeout, headers=headers) + if response.status_code != 200: + raise DownloadError(f"HTTP {response.status_code} for {response.url}") + return response.text, response.url + except Exception as exc: + last_err = exc + raise DownloadError(str(last_err) if last_err else f"Failed to fetch {url}") + + def _get_json( + self, + base_url: str, + path: str, + *, + params: dict[str, str] | None = None, + ) -> tuple[dict, str]: + text, origin = self._get_text( + base_url, + path, + accept="application/json", + params=params, + ) + try: + return json.loads(text), origin + except json.JSONDecodeError as exc: + raise DownloadError(f"Invalid SDMX JSON payload from {origin}") from exc + + def _get_dataflow(self, agency: str, flow: str) -> ET.Element: + xml_text, _origin = self._get_text(self.metadata_base_url, f"dataflow/{agency}/{flow}") + try: + return ET.fromstring(xml_text) + except ET.ParseError as exc: + raise DownloadError(f"Invalid SDMX XML metadata for flow={flow}") from exc + + def _current_version(self, root: ET.Element) -> str: + dataflow = root.find(".//str:Dataflow", SDMX_NS) + if dataflow is None: + raise DownloadError("SDMX dataflow not found") + + structure_ref = dataflow.find(".//str:Structure/Ref", SDMX_NS) + if structure_ref is None: + raise DownloadError("SDMX dataflow missing Structure/Ref") + + version = _safe_text(structure_ref.attrib.get("version")) + return version + + def _preview_dimensions(self, flow: str) -> list[str]: + payload, _origin = self._get_json( + self.data_base_url, + f"data/{flow}/all", + params={"firstNObservations": "0"}, + ) + structure = payload.get("structure") or {} + dimensions = (structure.get("dimensions") or {}).get("series") or [] + result: list[str] = [] + for dim in dimensions: + dim_id = str(dim.get("id") or "") + if dim_id: + result.append(dim_id) + if not result: + raise DownloadError(f"SDMX structure preview returned no series dimensions for flow={flow}") + return result + + def _build_key(self, dimensions: list[str], filters: dict | None) -> str: + filters = filters or {} + unknown = sorted(set(filters.keys()) - set(dimensions)) + if unknown: + raise DownloadError( + "Unknown SDMX filter dimensions: " + ", ".join(unknown) + ) + + if not dimensions: + return "all" + + parts: list[str] = [] + for dim in dimensions: + value = filters.get(dim) + if value is None: + parts.append("") + continue + if isinstance(value, (list, tuple)): + token = "+".join(str(item) for item in value) + else: + token = str(value) + parts.append(token) + + key = ".".join(parts).rstrip(".") + return key or "all" + + def _dimension_value(self, dimension: dict, index_token: str) -> tuple[str, str]: + values = dimension.get("values") or [] + idx = int(index_token) + if idx >= len(values): + raise DownloadError( + f"SDMX dimension index {idx} out of range for {dimension.get('id')}" + ) + entry = values[idx] + code = str(entry.get("id") or "") + label = str(entry.get("name") or code) + return code, label + + def _normalize_rows(self, payload: dict) -> tuple[list[str], list[dict[str, object]]]: + structure = payload.get("structure") or {} + dimensions = structure.get("dimensions") or {} + series_dims = dimensions.get("series") or [] + observation_dims = dimensions.get("observation") or [] + + header: list[str] = [] + for dim in series_dims: + dim_id = str(dim.get("id") or "") + if dim_id: + header.extend([dim_id, f"{dim_id}_label"]) + for dim in observation_dims: + dim_id = str(dim.get("id") or "") + if dim_id: + header.extend([dim_id, f"{dim_id}_label"]) + header.append("value") + + rows: list[dict[str, object]] = [] + for dataset in payload.get("dataSets") or []: + for series_key, series_val in (dataset.get("series") or {}).items(): + series_parts = series_key.split(":") if series_key else [] + series_ctx: dict[str, object] = {} + for idx, token in enumerate(series_parts): + if idx >= len(series_dims): + continue + dim = series_dims[idx] + dim_id = str(dim.get("id") or "") + code, label = self._dimension_value(dim, token) + series_ctx[dim_id] = code + series_ctx[f"{dim_id}_label"] = label + + for obs_key, obs_val in (series_val.get("observations") or {}).items(): + row = dict(series_ctx) + obs_parts = obs_key.split(":") if obs_key else [] + for idx, token in enumerate(obs_parts): + if idx >= len(observation_dims): + continue + dim = observation_dims[idx] + dim_id = str(dim.get("id") or "") + code, label = self._dimension_value(dim, token) + row[dim_id] = code + row[f"{dim_id}_label"] = label + + if isinstance(obs_val, list) and obs_val: + row["value"] = obs_val[0] + else: + row["value"] = obs_val + rows.append(row) + + return header, rows + + def _rows_to_csv(self, header: list[str], rows: list[dict[str, object]]) -> bytes: + buffer = io.StringIO(newline="") + writer = csv.DictWriter(buffer, fieldnames=header) + writer.writeheader() + for row in rows: + writer.writerow({col: row.get(col) for col in header}) + return buffer.getvalue().encode("utf-8") + + def fetch( + self, + agency: str, + flow: str, + version: str, + filters: dict | None = None, + ) -> tuple[bytes, str]: + agency = _safe_text(agency) or "IT1" + flow = _safe_text(flow) + version = _safe_text(version) + + if not flow: + raise DownloadError("SDMX source requires flow") + if not version: + raise DownloadError("SDMX source requires version") + + dataflow_root = self._get_dataflow(agency, flow) + current_version = self._current_version(dataflow_root) + + if current_version != version: + raise DownloadError( + f"Requested SDMX version {version} for {agency}/{flow} is not available; " + f"current version is {current_version}" + ) + + dimensions = self._preview_dimensions(flow) + key = self._build_key(dimensions, filters) + payload, origin = self._get_json(self.data_base_url, f"data/{flow}/{key}") + header, rows = self._normalize_rows(payload) + if not rows: + raise DownloadError(f"SDMX data returned no rows for {agency}/{flow} and key={key}") + return self._rows_to_csv(header, rows), origin diff --git a/toolkit/raw/run.py b/toolkit/raw/run.py index 8cebe22..ae84a5d 100644 --- a/toolkit/raw/run.py +++ b/toolkit/raw/run.py @@ -25,6 +25,9 @@ def _format_args(args: dict, year: int) -> dict: def _infer_ext(stype: str, formatted_args: dict, origin: str | None = None) -> str: + if stype == "sdmx": + return ".csv" + if stype in {"http_file", "ckan"}: url = origin or formatted_args.get("url", "") parsed = urlparse(url) @@ -75,6 +78,13 @@ def _fetch_payload(stype: str, client: dict, formatted_args: dict) -> tuple[byte str(formatted_args["resource_id"]) if formatted_args.get("resource_id") is not None else None, str(formatted_args["dataset_id"]) if formatted_args.get("dataset_id") is not None else None, ) + elif stype == "sdmx": + payload, origin = src.fetch( + str(formatted_args.get("agency") or "IT1"), + str(formatted_args["flow"]), + str(formatted_args["version"]), + formatted_args.get("filters"), + ) elif stype == "http_file": payload = src.fetch(formatted_args["url"]) origin = formatted_args["url"] From 23138ffce93f254113ac0f2eb4feed12b46eb391 Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Mon, 30 Mar 2026 18:01:41 +0100 Subject: [PATCH 5/7] fix: avoid retrying deterministic ckan metadata errors --- toolkit/plugins/ckan.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/toolkit/plugins/ckan.py b/toolkit/plugins/ckan.py index a89fdb7..1b05fe7 100644 --- a/toolkit/plugins/ckan.py +++ b/toolkit/plugins/ckan.py @@ -52,6 +52,8 @@ def _get_json(self, url: str, params: dict[str, str]) -> dict: if not data.get("success"): raise DownloadError(f"CKAN API failed for {response.url}") return data + except DownloadError: + raise except Exception as exc: last_err = exc raise DownloadError(str(last_err) if last_err else f"Failed to fetch CKAN metadata from {url}") From 0a402c10a2a02028fc6ffccdec30dc00ebfe53c5 Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Mon, 30 Mar 2026 18:04:58 +0100 Subject: [PATCH 6/7] docs: clarify sdmx TIME_PERIOD filtering --- docs/config-schema.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/config-schema.md b/docs/config-schema.md index 2cdee89..783ce11 100644 --- a/docs/config-schema.md +++ b/docs/config-schema.md @@ -108,6 +108,7 @@ Note pratiche per `sdmx`: - la `version` e' obbligatoria e deve coincidere con la versione corrente esposta dal dataflow - non esiste fallback silenzioso a `latest` - in v1 i `filters` sono supportati solo sulle dimensioni di serie, non su `TIME_PERIOD` +- il filtro temporale va applicato nel layer `clean.sql` (per esempio `WHERE TIME_PERIOD = '2024'`), non in `raw.sources[].args.filters` - il plugin restituisce un CSV normalizzato con colonne `DIM`, `DIM_label` e `value` ## clean From 27df571c5421c2c2c319685b1af46796df6b5e57 Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Tue, 31 Mar 2026 15:35:12 +0100 Subject: [PATCH 7/7] fix: scope sdmx data queries to agency and version --- tests/test_sdmx_plugin.py | 8 ++++---- toolkit/plugins/sdmx.py | 18 +++++++++++++----- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/tests/test_sdmx_plugin.py b/tests/test_sdmx_plugin.py index e9be7c2..a8184b4 100644 --- a/tests/test_sdmx_plugin.py +++ b/tests/test_sdmx_plugin.py @@ -88,9 +88,9 @@ def _fake_get(url, params=None, timeout=None, headers=None): calls.append((url, params, headers.get("Accept") if headers else None)) if url.endswith("/dataflow/IT1/22_289"): return _FakeResponse(200, DATAFLOW_XML, url) - if url.endswith("/data/22_289/all"): + if url.endswith("/data/IT1,22_289,1.5/all"): return _FakeResponse(200, PREVIEW_JSON, url) - if url.endswith("/data/22_289/A.001001.JAN.9.TOTAL.99"): + if url.endswith("/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99"): return _FakeResponse(200, DATA_JSON, url) raise AssertionError(f"Unexpected URL {url}") @@ -111,7 +111,7 @@ def _fake_get(url, params=None, timeout=None, headers=None): ) text = payload.decode("utf-8") - assert origin.endswith("/data/22_289/A.001001.JAN.9.TOTAL.99") + assert origin.endswith("/data/IT1,22_289,1.5/A.001001.JAN.9.TOTAL.99") assert "FREQ,FREQ_label" in text assert "A,annual" in text assert "001001,Agliè" in text @@ -140,7 +140,7 @@ def test_sdmx_fetch_rejects_unknown_filter_dimension(monkeypatch): def _fake_get(url, params=None, timeout=None, headers=None): if url.endswith("/dataflow/IT1/22_289"): return _FakeResponse(200, DATAFLOW_XML, url) - if url.endswith("/data/22_289/all"): + if url.endswith("/data/IT1,22_289,1.5/all"): return _FakeResponse(200, PREVIEW_JSON, url) raise AssertionError(f"Unexpected URL {url}") diff --git a/toolkit/plugins/sdmx.py b/toolkit/plugins/sdmx.py index 8469f54..f9b4542 100644 --- a/toolkit/plugins/sdmx.py +++ b/toolkit/plugins/sdmx.py @@ -23,6 +23,10 @@ def _normalize_base_url(url: str) -> str: return url.rstrip("/") +def _flow_ref(agency: str, flow: str, version: str) -> str: + return f"{agency},{flow},{version}" + + class SdmxSource: """Fetch SDMX data as a normalized CSV payload.""" @@ -105,10 +109,11 @@ def _current_version(self, root: ET.Element) -> str: version = _safe_text(structure_ref.attrib.get("version")) return version - def _preview_dimensions(self, flow: str) -> list[str]: + def _preview_dimensions(self, agency: str, flow: str, version: str) -> list[str]: + flow_ref = _flow_ref(agency, flow, version) payload, _origin = self._get_json( self.data_base_url, - f"data/{flow}/all", + f"data/{flow_ref}/all", params={"firstNObservations": "0"}, ) structure = payload.get("structure") or {} @@ -119,7 +124,9 @@ def _preview_dimensions(self, flow: str) -> list[str]: if dim_id: result.append(dim_id) if not result: - raise DownloadError(f"SDMX structure preview returned no series dimensions for flow={flow}") + raise DownloadError( + f"SDMX structure preview returned no series dimensions for {flow_ref}" + ) return result def _build_key(self, dimensions: list[str], filters: dict | None) -> str: @@ -244,9 +251,10 @@ def fetch( f"current version is {current_version}" ) - dimensions = self._preview_dimensions(flow) + flow_ref = _flow_ref(agency, flow, version) + dimensions = self._preview_dimensions(agency, flow, version) key = self._build_key(dimensions, filters) - payload, origin = self._get_json(self.data_base_url, f"data/{flow}/{key}") + payload, origin = self._get_json(self.data_base_url, f"data/{flow_ref}/{key}") header, rows = self._normalize_rows(payload) if not rows: raise DownloadError(f"SDMX data returned no rows for {agency}/{flow} and key={key}")