diff --git a/docs/config-schema.md b/docs/config-schema.md index 8787c00..63ddb91 100644 --- a/docs/config-schema.md +++ b/docs/config-schema.md @@ -57,6 +57,29 @@ I path relativi sono sempre risolti rispetto alla directory che contiene `datase `raw.sources[].args` e `raw.extractor.args` devono essere sempre oggetti YAML, non liste o stringhe. +Esempio `ckan`: + +```yaml +raw: + sources: + - name: bdap_lea + type: ckan + client: + timeout: 60 + retries: 2 + args: + portal_url: https://bdap-opendata.rgs.mef.gov.it/SpodCkanApi/api/3 + dataset_id: "d598ebd9-949d-4214-bb33-cd9c1be08f15" + resource_id: "33344" +``` + +Note pratiche per `ckan`: + +- il toolkit interroga `resource_show` prima del download +- se `resource_show` non e disponibile o non risolve il file, il toolkit ripiega su `package_show` +- se il portale restituisce un file URL in `http://`, il toolkit lo forza automaticamente a `https://` +- se `filename` non e dichiarato, il toolkit prova a inferire l'estensione dall'URL risolto + ## clean | Campo | Tipo | Default | diff --git a/smoke/README.md b/smoke/README.md index 0f6d35d..ccdc7ce 100644 --- a/smoke/README.md +++ b/smoke/README.md @@ -16,10 +16,10 @@ Di conseguenza: Progetti inclusi: -- `smoke/ispra_http_csv`: `http_file` contro server locale `http.server` - `smoke/local_file_csv`: `local_file` completamente offline - `smoke/zip_http_csv`: `http_file` + extractor ZIP (`unzip_first_csv`) contro server locale - `smoke/bdap_http_csv`: `http_file` contro CSV pubblico BDAP +- `smoke/bdap_ckan_csv`: `ckan` contro OpenBDAP, con fallback `package_show` e force `https` - `smoke/finanze_http_zip_2023`: `http_file` contro ZIP pubblico reale, best-effort Ogni progetto include: diff --git a/smoke/bdap_ckan_csv/README.md b/smoke/bdap_ckan_csv/README.md new file mode 100644 index 0000000..ac82f18 --- /dev/null +++ b/smoke/bdap_ckan_csv/README.md @@ -0,0 +1,28 @@ +# bdap_ckan_csv + +Smoke manuale per `ckan` contro OpenBDAP, con fallback `resource_show -> package_show` e force `https`. + +## Comandi + +```bash +toolkit run raw --config dataset.yml +toolkit profile raw --config dataset.yml +toolkit run clean --config dataset.yml +toolkit run mart --config dataset.yml +toolkit status --dataset bdap_ckan_csv --year 2022 --latest --config dataset.yml +``` + +## Verifiche attese + +- `./_smoke_out/data/raw/bdap_ckan_csv/2022/manifest.json` +- `./_smoke_out/data/raw/bdap_ckan_csv/2022/raw_validation.json` +- `./_smoke_out/data/raw/bdap_ckan_csv/2022/_profile/raw_profile.json` +- `./_smoke_out/data/raw/bdap_ckan_csv/2022/_profile/suggested_read.yml` +- `./_smoke_out/data/clean/bdap_ckan_csv/2022/metadata.json` con `read_params_source`, `read_source_used`, `read_params_used` +- `./_smoke_out/data/mart/bdap_ckan_csv/2022/mart_ok.parquet` + +## Note + +- questo smoke usa un dataset OpenBDAP reale +- il portale espone `package_show`, ma `resource_show` non risolve: il caso serve proprio a verificare il fallback del plugin +- l'URL file restituito dal portale puo' arrivare in `http://`: il toolkit lo forza a `https://` prima del download diff --git a/smoke/bdap_ckan_csv/dataset.yml b/smoke/bdap_ckan_csv/dataset.yml new file mode 100644 index 0000000..9af0c3c --- /dev/null +++ b/smoke/bdap_ckan_csv/dataset.yml @@ -0,0 +1,29 @@ +root: "./_smoke_out" + +dataset: + name: "bdap_ckan_csv" + years: [2022] + +raw: + sources: + - type: "ckan" + args: + portal_url: "https://bdap-opendata.rgs.mef.gov.it/SpodCkanApi/api/3" + dataset_id: "d598ebd9-949d-4214-bb33-cd9c1be08f15" + resource_id: "33344" + filename: "bdap_lea_2024.csv" + +clean: + sql: "sql/clean.sql" + read: + delim: ";" + decimal: "." + encoding: "utf-8" + header: true + columns: null + validate: {} + +mart: + tables: + - name: "mart_ok" + sql: "sql/mart/mart_ok.sql" diff --git a/smoke/bdap_ckan_csv/sql/clean.sql b/smoke/bdap_ckan_csv/sql/clean.sql new file mode 100644 index 0000000..c31afca --- /dev/null +++ b/smoke/bdap_ckan_csv/sql/clean.sql @@ -0,0 +1,16 @@ +WITH base AS ( + SELECT + TRY_CAST(TRIM(CAST("Anno di Riferimento" AS VARCHAR)) AS INTEGER) AS anno, + TRY_CAST(TRIM(CAST("Codice Regione" AS VARCHAR)) AS INTEGER) AS codice_regione, + TRIM(CAST("Descrizione Regione" AS VARCHAR)) AS regione, + TRY_CAST(TRIM(CAST("Codice Ente SSN" AS VARCHAR)) AS INTEGER) AS codice_ente_ssn, + TRIM(CAST("Descrizione Ente" AS VARCHAR)) AS descrizione_ente, + TRIM(CAST("Codice Voce Contabile" AS VARCHAR)) AS codice_voce_contabile, + TRIM(CAST("Descrizione Voce Contabile" AS VARCHAR)) AS descrizione_voce_contabile, + TRY_CAST(TRIM(CAST("Importo Totale" AS VARCHAR)) AS DOUBLE) AS importo_totale + FROM raw_input +) + +SELECT * +FROM base +WHERE anno IS NOT NULL; diff --git a/smoke/bdap_ckan_csv/sql/mart/mart_ok.sql b/smoke/bdap_ckan_csv/sql/mart/mart_ok.sql new file mode 100644 index 0000000..32f2b31 --- /dev/null +++ b/smoke/bdap_ckan_csv/sql/mart/mart_ok.sql @@ -0,0 +1,10 @@ +SELECT + anno, + regione, + descrizione_voce_contabile, + COUNT(*) AS righe, + SUM(importo_totale) AS importo_totale +FROM clean_input +WHERE anno IS NOT NULL +GROUP BY 1, 2, 3 +ORDER BY 1, 2, 3 diff --git a/smoke/ispra_http_csv/README.md b/smoke/ispra_http_csv/README.md deleted file mode 100644 index c3522e4..0000000 --- a/smoke/ispra_http_csv/README.md +++ /dev/null @@ -1,35 +0,0 @@ -# ispra_http_csv - -Smoke manuale per `http_file` con CSV servito localmente. - -## Setup - -Avvia un server HTTP dalla directory di questo progetto: - -```bash -py -m http.server 8000 -``` - -In un secondo terminale: - -```bash -toolkit run raw --config dataset.yml -toolkit profile raw --config dataset.yml -toolkit run clean --config dataset.yml -toolkit run mart --config dataset.yml -toolkit status --dataset ispra_http_csv --year 2022 --latest --config dataset.yml -``` - -## Verifiche attese - -- `./_smoke_out/data/raw/ispra_http_csv/2022/manifest.json` -- `./_smoke_out/data/raw/ispra_http_csv/2022/raw_validation.json` -- `./_smoke_out/data/raw/ispra_http_csv/2022/_profile/raw_profile.json` -- `./_smoke_out/data/raw/ispra_http_csv/2022/_profile/suggested_read.yml` -- `./_smoke_out/data/clean/ispra_http_csv/2022/metadata.json` con `read_params_source`, `read_source_used`, `read_params_used` -- `./_smoke_out/data/mart/ispra_http_csv/2022/mart_ok.parquet` - -Controlli contratto: - -- RAW `manifest.json` contiene `primary_output_file` -- CLEAN usa il manifest RAW e scrive `metadata.json` con audit fields di lettura diff --git a/smoke/ispra_http_csv/dataset.yml b/smoke/ispra_http_csv/dataset.yml deleted file mode 100644 index e62568d..0000000 --- a/smoke/ispra_http_csv/dataset.yml +++ /dev/null @@ -1,31 +0,0 @@ -root: "./_smoke_out" - -dataset: - name: "ispra_http_csv" - years: [2022] - -raw: - sources: - - type: "http_file" - args: - url: "https://www.catasto-rifiuti.isprambiente.it/get/getDettaglioComunale.csv.php?&aa={year}" - filename: "ispra_http_sample_{year}.csv" - -clean: - sql: "sql/clean.sql" - read: - encoding: "utf-8" - header: true - decimal: "," - skip: 1 - delim: ";" - null_padding: true - ignore_errors: true - auto_detect: false - strict_mode: false - columns: null - -mart: - tables: - - name: "mart_ok" - sql: "sql/mart/mart_ok.sql" diff --git a/smoke/ispra_http_csv/fixtures/ispra_http_sample.csv b/smoke/ispra_http_csv/fixtures/ispra_http_sample.csv deleted file mode 100644 index 24cf4f5..0000000 --- a/smoke/ispra_http_csv/fixtures/ispra_http_sample.csv +++ /dev/null @@ -1,3 +0,0 @@ -anno,comune,provincia,regione,valore -2022,Agliè,Torino,Piemonte,1 -2022,Ivrea,Torino,Piemonte,2 diff --git a/smoke/ispra_http_csv/sql/clean.sql b/smoke/ispra_http_csv/sql/clean.sql deleted file mode 100644 index 70e1ce8..0000000 --- a/smoke/ispra_http_csv/sql/clean.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT 1 AS ok diff --git a/smoke/ispra_http_csv/sql/mart/mart_ok.sql b/smoke/ispra_http_csv/sql/mart/mart_ok.sql deleted file mode 100644 index 70e1ce8..0000000 --- a/smoke/ispra_http_csv/sql/mart/mart_ok.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT 1 AS ok diff --git a/tests/test_ckan_plugin.py b/tests/test_ckan_plugin.py new file mode 100644 index 0000000..aad2bb7 --- /dev/null +++ b/tests/test_ckan_plugin.py @@ -0,0 +1,133 @@ +from toolkit.core.exceptions import DownloadError +from toolkit.plugins.ckan import CkanSource + + +class _FakeResponse: + def __init__(self, status_code: int, *, json_data=None, content: bytes = b"", url: str = "https://example.org"): + self.status_code = status_code + self._json_data = json_data + self.content = content + self.url = url + + def json(self): + return self._json_data + + +def test_ckan_fetch_resource_show_forces_https(monkeypatch): + calls = [] + + def _fake_get(url, params=None, timeout=None, headers=None): + calls.append((url, params)) + if "resource_show" in url: + return _FakeResponse( + 200, + json_data={ + "success": True, + "result": {"url": "http://portal.example.org/export/data.csv"}, + }, + url=f"{url}?id=abc", + ) + return _FakeResponse( + 200, + content=b"a,b\n1,2\n", + url="https://portal.example.org/export/data.csv", + ) + + monkeypatch.setattr("toolkit.plugins.ckan.requests.get", _fake_get) + + payload, origin = CkanSource().fetch("https://portal.example.org/api/3", resource_id="abc") + + assert payload == b"a,b\n1,2\n" + assert origin == "https://portal.example.org/export/data.csv" + assert calls[1][0] == "https://portal.example.org/export/data.csv" + + +def test_ckan_fetch_falls_back_to_package_show(monkeypatch): + calls = [] + + def _fake_get(url, params=None, timeout=None, headers=None): + calls.append((url, params)) + if "resource_show" in url: + return _FakeResponse(404, json_data={}, url=f"{url}?id=33344") + if "package_show" in url: + return _FakeResponse( + 200, + json_data={ + "success": True, + "result": { + "resources": [ + { + "id": 33344, + "name": "csv dump", + "format": "CSV", + "url": "http://portal.example.org/api/3/datastore/dump/dataset.csv", + } + ] + }, + }, + url=f"{url}?id=dataset-id", + ) + return _FakeResponse( + 200, + content=b"a,b\n1,2\n", + url="https://portal.example.org/api/3/datastore/dump/dataset.csv", + ) + + monkeypatch.setattr("toolkit.plugins.ckan.requests.get", _fake_get) + + payload, origin = CkanSource().fetch( + "https://portal.example.org/api/3", + resource_id="33344", + dataset_id="dataset-id", + ) + + assert payload == b"a,b\n1,2\n" + assert origin == "https://portal.example.org/api/3/datastore/dump/dataset.csv" + assert any("package_show" in call[0] for call in calls) + + +def test_ckan_fetch_requires_identifier(): + try: + CkanSource().fetch("https://portal.example.org/api/3") + except DownloadError as exc: + assert "resource_id or dataset_id" in str(exc) + else: + raise AssertionError("Expected DownloadError") + + +def test_ckan_fetch_rejects_package_fallback_when_resource_id_missing(monkeypatch): + def _fake_get(url, params=None, timeout=None, headers=None): + if "resource_show" in url: + return _FakeResponse(404, json_data={}, url=f"{url}?id=99999") + if "package_show" in url: + return _FakeResponse( + 200, + json_data={ + "success": True, + "result": { + "resources": [ + { + "id": 33344, + "name": "csv dump", + "format": "CSV", + "url": "http://portal.example.org/api/3/datastore/dump/dataset.csv", + } + ] + }, + }, + url=f"{url}?id=dataset-id", + ) + raise AssertionError(f"Unexpected download request to {url}") + + monkeypatch.setattr("toolkit.plugins.ckan.requests.get", _fake_get) + + try: + CkanSource().fetch( + "https://portal.example.org/api/3", + resource_id="99999", + dataset_id="dataset-id", + ) + except DownloadError as exc: + assert "resource_id=99999" in str(exc) + else: + raise AssertionError("Expected DownloadError") diff --git a/tests/test_raw_ext_inference.py b/tests/test_raw_ext_inference.py index 4126648..c6c1285 100644 --- a/tests/test_raw_ext_inference.py +++ b/tests/test_raw_ext_inference.py @@ -20,11 +20,41 @@ def test_infer_ext_http_csv_php_and_zip_php(): assert _infer_ext("http_file", {"url": "https://example.org/archive.zip.php"}) == ".zip" +def test_infer_ext_ckan_uses_resolved_origin(): + assert _infer_ext("ckan", {}, origin="https://example.org/dump/data.csv") == ".csv" + assert _infer_ext("ckan", {}, origin="https://example.org/archive.zip.php") == ".zip" + + def test_infer_ext_never_returns_php(): assert _infer_ext("http_file", {"url": "https://example.org/download.php?id=42"}) != ".php" assert _infer_ext("local_file", {"path": "C:/tmp/file.php"}) != ".php" +def test_run_raw_ckan_filename_inferred_from_resolved_url(monkeypatch, tmp_path: Path): + def _fake_fetch_payload(_stype: str, _client: dict, _formatted_args: dict): + return b"a,b\n1,2\n", "https://example.org/data.csv" + + monkeypatch.setattr("toolkit.raw.run._fetch_payload", _fake_fetch_payload) + + raw_cfg = { + "sources": [ + { + "name": "bdap_resource", + "type": "ckan", + "args": { + "portal_url": "https://portal.example.org/SpodCkanApi/api/3", + "resource_id": "33344", + }, + } + ] + } + + run_raw("demo", 2024, str(tmp_path), raw_cfg, _NoopLogger()) + + out_dir = tmp_path / "data" / "raw" / "demo" / "2024" + assert (out_dir / "bdap_resource.csv").exists() + + def test_run_raw_filename_override_has_priority(monkeypatch, tmp_path: Path): def _fake_fetch_payload(_stype: str, _client: dict, _formatted_args: dict): return b"a,b\n1,2\n", "https://example.org/dataset.csv.php" diff --git a/tests/test_registry.py b/tests/test_registry.py index 757e230..146777b 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -21,5 +21,6 @@ def test_register_builtin_plugins_registers_present_plugins(): register_builtin_plugins(registry_obj=r) plugins = r.list_plugins() + assert "ckan" in plugins assert "http_file" in plugins assert "local_file" in plugins diff --git a/toolkit/core/registry.py b/toolkit/core/registry.py index d8bcad5..438868e 100644 --- a/toolkit/core/registry.py +++ b/toolkit/core/registry.py @@ -44,6 +44,13 @@ def clear(self) -> None: registry = Registry() _BUILTIN_PLUGINS: tuple[dict[str, Any], ...] = ( + { + "name": "ckan", + "module": "toolkit.plugins.ckan", + "class_name": "CkanSource", + "optional": False, + "factory": lambda cls: (lambda **client: cls(**client)), + }, { "name": "http_file", "module": "toolkit.plugins.http_file", diff --git a/toolkit/plugins/__init__.py b/toolkit/plugins/__init__.py index f6fc45e..c7accfb 100644 --- a/toolkit/plugins/__init__.py +++ b/toolkit/plugins/__init__.py @@ -6,4 +6,5 @@ - `local_file` - `http_file` +- `ckan` """ diff --git a/toolkit/plugins/ckan.py b/toolkit/plugins/ckan.py new file mode 100644 index 0000000..bfb398f --- /dev/null +++ b/toolkit/plugins/ckan.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +from urllib.parse import urlparse, urlunparse + +import requests + +from toolkit.core.exceptions import DownloadError + + +def _normalize_resource_show_url(portal_url: str) -> str: + base = portal_url.rstrip("/") + if base.endswith("/api/3/action"): + return f"{base}/resource_show" + if base.endswith("/api/3"): + return f"{base}/action/resource_show" + return f"{base}/api/3/action/resource_show" + + +def _normalize_package_show_url(portal_url: str) -> str: + base = portal_url.rstrip("/") + if base.endswith("/api/3/action"): + return f"{base}/package_show" + if base.endswith("/api/3"): + return f"{base}/action/package_show" + return f"{base}/api/3/action/package_show" + + +def _force_https(url: str) -> str: + parsed = urlparse(url) + if parsed.scheme == "http": + return urlunparse(parsed._replace(scheme="https")) + return url + + +class CkanSource: + """Resolve a CKAN resource via resource_show, then download its current URL.""" + + def __init__(self, timeout: int = 60, retries: int = 2, user_agent: str | None = None): + self.timeout = timeout + self.retries = retries + self.user_agent = user_agent or "dataciviclab-toolkit/0.1" + + def _get_json(self, url: str, params: dict[str, str]) -> dict: + headers = {"User-Agent": self.user_agent} + last_err: Exception | None = None + for _ in range(max(1, self.retries)): + try: + response = requests.get(url, params=params, timeout=self.timeout, headers=headers) + if response.status_code != 200: + raise DownloadError(f"HTTP {response.status_code} for {response.url}") + data = response.json() + if not data.get("success"): + raise DownloadError(f"CKAN API failed for {response.url}") + return data + except Exception as exc: + last_err = exc + raise DownloadError(str(last_err) if last_err else f"Failed to fetch CKAN metadata from {url}") + + def _download_bytes(self, url: str) -> bytes: + headers = {"User-Agent": self.user_agent} + last_err: Exception | None = None + for _ in range(max(1, self.retries)): + try: + response = requests.get(url, timeout=self.timeout, headers=headers) + if response.status_code != 200: + raise DownloadError(f"HTTP {response.status_code} for {url}") + return response.content + except Exception as exc: + last_err = exc + raise DownloadError(str(last_err) if last_err else f"Failed to fetch {url}") + + def _select_resource_from_package(self, result: dict, resource_id: str | None) -> dict: + resources = result.get("resources") or [] + if not resources: + raise DownloadError("CKAN package_show returned no resources") + + if resource_id: + for item in resources: + if str(item.get("id")) == str(resource_id): + return item + raise DownloadError( + f"CKAN package_show did not contain requested resource_id={resource_id}" + ) + + with_url = [item for item in resources if item.get("url")] + if not with_url: + raise DownloadError("CKAN package_show returned resources without URL") + + def _score(item: dict) -> tuple[int, str]: + fmt = str(item.get("format") or "").lower() + url = str(item.get("url") or "").lower() + if "csv" in fmt or ".csv" in url: + rank = 0 + elif "zip" in fmt or ".zip" in url: + rank = 1 + elif "xlsx" in fmt or ".xlsx" in url or "xls" in fmt or ".xls" in url: + rank = 2 + elif "json" in fmt or ".json" in url: + rank = 3 + elif "xml" in fmt or ".xml" in url: + rank = 4 + else: + rank = 9 + return rank, str(item.get("name") or "") + + return sorted(with_url, key=_score)[0] + + def fetch( + self, + portal_url: str, + resource_id: str | None = None, + dataset_id: str | None = None, + ) -> tuple[bytes, str]: + last_err: Exception | None = None + + if resource_id: + api_url = _normalize_resource_show_url(portal_url) + try: + metadata = self._get_json(api_url, {"id": str(resource_id)}) + result = metadata.get("result") or {} + raw_url = result.get("url") + if raw_url: + resolved_url = _force_https(str(raw_url)) + return self._download_bytes(resolved_url), resolved_url + last_err = DownloadError( + f"CKAN resource_show returned no URL for resource_id={resource_id}" + ) + except Exception as exc: + last_err = exc + + package_identifier = dataset_id or resource_id + if package_identifier: + api_url = _normalize_package_show_url(portal_url) + try: + metadata = self._get_json(api_url, {"id": str(package_identifier)}) + result = metadata.get("result") or {} + resource = self._select_resource_from_package(result, resource_id) + resolved_url = _force_https(str(resource["url"])) + return self._download_bytes(resolved_url), resolved_url + except Exception as exc: + last_err = exc + + if last_err: + raise DownloadError(str(last_err)) + raise DownloadError("CKAN source requires resource_id or dataset_id") diff --git a/toolkit/raw/run.py b/toolkit/raw/run.py index 7ae8b2e..8cebe22 100644 --- a/toolkit/raw/run.py +++ b/toolkit/raw/run.py @@ -24,9 +24,9 @@ def _format_args(args: dict, year: int) -> dict: return formatted -def _infer_ext(stype: str, formatted_args: dict) -> str: - if stype == "http_file": - url = formatted_args.get("url", "") +def _infer_ext(stype: str, formatted_args: dict, origin: str | None = None) -> str: + if stype in {"http_file", "ckan"}: + url = origin or formatted_args.get("url", "") parsed = urlparse(url) path = parsed.path or "" low_path = path.lower() @@ -69,7 +69,13 @@ def _infer_ext(stype: str, formatted_args: dict) -> str: def _fetch_payload(stype: str, client: dict, formatted_args: dict) -> tuple[bytes, str]: src = registry.create(stype, **(client or {})) - if stype == "http_file": + if stype == "ckan": + payload, origin = src.fetch( + formatted_args["portal_url"], + str(formatted_args["resource_id"]) if formatted_args.get("resource_id") is not None else None, + str(formatted_args["dataset_id"]) if formatted_args.get("dataset_id") is not None else None, + ) + elif stype == "http_file": payload = src.fetch(formatted_args["url"]) origin = formatted_args["url"] elif stype == "local_file": @@ -210,7 +216,7 @@ def run_raw( if explicit: extracted = {explicit: payload} else: - ext = _infer_ext(stype, formatted_args) + ext = _infer_ext(stype, formatted_args, origin=origin) extracted = {f"{name}{ext}": payload} # scrittura file