diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 15628fa..06dea0e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,5 +1,31 @@ # Contributing +## Test Suite Tiers + +The test suite is intentionally stratified so release-critical coverage stays visible. + +- `core`: public contract and canonical workflow + - config and strict mode + - path contract relative to `dataset.yml` + - `run all`, `validate all`, `status`, `inspect paths` + - end-to-end RAW -> CLEAN -> MART + - run records, resume, validation layers +- `advanced`: supported but non-happy-path or secondary engine behavior + - detailed read modes and selection logic + - extractors and plugin registry + - profiling details + - artifact policy and logging helpers +- `compat`: compatibility-only behavior + - deprecated import shims and legacy-only coverage + +Useful commands: + +```bash +py -m pytest -m core +py -m pytest -m "core or advanced" +py -m pytest -m compat +``` + ## Git Hook Install the lightweight pre-commit guardrail before contributing: diff --git a/README.md b/README.md index c5b2661..62ff410 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,20 @@ Ruoli delle repo correlate: Questa repo non e' l'hub dell'organizzazione e non replica la documentazione org-wide: resta focalizzata sul motore e sul suo contratto tecnico. +## Confini Del Toolkit + +Il toolkit espone un perimetro volutamente stretto: + +- core runtime: `raw`, `clean`, `mart`, `run`, `validate`, `status`, `inspect` +- advanced tooling: `resume`, `profile raw`, run parziali per layer +- compatibility only: alias legacy e shim deprecati + +Regola pratica: + +- nuovi repo dataset: resta nel workflow canonico +- recovery o diagnostica: usa gli strumenti advanced +- bootstrap o compatibilita': non trattarli come parte del contratto stabile + ## Obiettivi - mantenere una struttura progetto semplice: `dataset.yml` + `sql/` @@ -31,7 +45,7 @@ Il toolkit include: - pipeline `raw`, `clean`, `mart` - validation gate post-layer integrato in `run` - run tracking persistente in `data/_runs/...` -- comandi CLI `run`, `resume`, `status`, `validate`, `profile`, `gen-sql` +- comandi CLI `run`, `resume`, `status`, `validate`, `profile`, `inspect` - `project-example/` offline per smoke test locale ## Installazione @@ -102,6 +116,7 @@ Schema completo e legacy supportato: [docs/config-schema.md](docs/config-schema. Flow avanzati e tooling secondario: [docs/advanced-workflows.md](docs/advanced-workflows.md) Matrice di stabilita`: [docs/feature-stability.md](docs/feature-stability.md) Contratto notebook/output: [docs/notebook-contract.md](docs/notebook-contract.md) +Confini runtime e superfici non-core: [docs/runtime-boundaries.md](docs/runtime-boundaries.md) Per policy condivise e community health organizzativa, fai riferimento alla repo `.github` dell'organizzazione. Artefatti attesi: @@ -189,7 +204,7 @@ toolkit inspect paths --config dataset.yml --year 2024 --json toolkit run all --config dataset.yml --dry-run --strict-config ``` -`resume`, `profile raw`, `run raw|clean|mart`, `gen-sql` e la policy completa degli artifacts restano disponibili, ma sono tooling avanzato: vedi [docs/advanced-workflows.md](docs/advanced-workflows.md). +`resume`, `profile raw`, `run raw|clean|mart` e la policy completa degli artifacts restano disponibili, ma sono tooling avanzato: vedi [docs/advanced-workflows.md](docs/advanced-workflows.md). ## Notebook locali @@ -208,6 +223,13 @@ Helper ufficiale per evitare path logic duplicata nei notebook: toolkit inspect paths --config dataset.yml --year 2024 --json ``` +Ruoli stabili degli output: + +- `metadata.json`: payload ricco del layer +- `manifest.json`: summary stabile del layer con puntatori a metadata e validation +- `data/_runs/.../.json`: stato del run usato da `status` e `resume` +- `inspect paths --json`: discovery helper read-only per notebook e script locali + Questo mantiene il contratto semplice tra toolkit e repo dataset: - il toolkit produce artefatti e metadata stabili diff --git a/docs/advanced-workflows.md b/docs/advanced-workflows.md index 5bb499a..847af5a 100644 --- a/docs/advanced-workflows.md +++ b/docs/advanced-workflows.md @@ -9,6 +9,12 @@ Percorso canonico: - `toolkit status --dataset --year --latest --config dataset.yml` - notebook locali che leggono output e metadata sotto `root/data/...` +Questa categoria include anche tooling di supporto che non va confuso con il runtime principale del toolkit: + +- `toolkit.profile` +- `resume` +- run parziali per layer + ## Step singoli Utili per debug o per ripetere solo una parte della pipeline: @@ -80,16 +86,6 @@ Regola pratica: `legacy_aliases` resta supportato per compatibilita`, ma non va promosso nei nuovi repo dataset. -## gen-sql - -`toolkit gen-sql --config dataset.yml` resta disponibile come bootstrap helper da mapping dichiarativo. - -Stato raccomandato: - -- supportato -- utile per bootstrap guidato -- non parte del workflow operativo standard -- da considerare congelato: bugfix si`, espansioni solo se emerge uso reale ## Compat legacy diff --git a/docs/config-schema.md b/docs/config-schema.md index ef2492c..121e3b5 100644 --- a/docs/config-schema.md +++ b/docs/config-schema.md @@ -65,8 +65,6 @@ I path relativi sono sempre risolti rispetto alla directory che contiene `datase | `clean.read_mode` | `strict \| fallback \| robust` | `fallback` | | `clean.read_source` | `auto \| config_only \| null` | `null` | | `clean.read` | `CleanRead \| null` | `null` | -| `clean.mapping` | `dict[str, CleanMappingSpec] \| null` | `null` | -| `clean.derive` | `dict[str, CleanDeriveFieldConfig] \| null` | `null` | | `clean.required_columns` | `list[str]` | `[]` | | `clean.validate` | `CleanValidate` | `{}` | @@ -114,22 +112,6 @@ I path relativi sono sempre risolti rispetto alla directory che contiene `datase | `min` | `float \| null` | `null` | | `max` | `float \| null` | `null` | -`CleanMappingSpec.parse` shape minima: - -| Campo | Tipo | Default | -|---|---|---| -| `kind` | `string \| null` | `null` | -| `locale` | `string \| null` | `null` | -| `options` | `dict[string,any] \| null` | `null` | - -`clean.derive` shape minima: - -| Campo | Tipo | Default | -|---|---|---| -| `expr` | `string` | nessuno | - -`clean.mapping.*.parse` e `clean.derive.*` devono essere oggetti YAML, non stringhe o liste. - ## mart | Campo | Tipo | Default | diff --git a/docs/conventions.md b/docs/conventions.md index 7af7ed4..414cf96 100644 --- a/docs/conventions.md +++ b/docs/conventions.md @@ -20,6 +20,7 @@ Questa pagina raccoglie i contratti operativi stabili del toolkit per la pipelin - Il file JSON canonico e` `raw_profile.json`, ma viene scritto solo per policy `standard|debug`. - `profile.json` resta un alias di compatibilita` opzionale, controllato da `output.legacy_aliases`. - `suggested_read.yml` e` il contratto usato da CLEAN per i format hints e resta richiesto solo quando `clean.read.source: auto`. +- `suggested_mapping.yml` resta un artefatto diagnostico opzionale per uso umano; non e` un input del runtime canonico del toolkit. ## Artifacts Policy diff --git a/docs/feature-stability.md b/docs/feature-stability.md index b320c43..fd2dd41 100644 --- a/docs/feature-stability.md +++ b/docs/feature-stability.md @@ -16,9 +16,11 @@ Questa matrice serve a chiarire cosa il toolkit considera percorso canonico, cos | artifact policy `minimal|standard|debug` | supported / advanced | tuning operativo | | `legacy_aliases` | compatibility only | non promuovere nei repo nuovi | | config legacy | compatibility only | usare `--strict-config` nei repo nuovi | -| `gen-sql` | frozen helper | bootstrap guidato, non workflow standard | -| `api_json_paged` | experimental | usare solo con evidenza reale | -| `html_table` | experimental | usare solo con evidenza reale | +Lettura equivalente a livello package: + +- core runtime: `toolkit.raw`, `toolkit.clean`, `toolkit.mart`, `toolkit.cli` (`run`, `validate`, `status`, `inspect`) +- advanced tooling: `toolkit.profile`, `resume`, run parziali +- compatibility only: config legacy e alias storici Regola pratica: diff --git a/docs/notebook-contract.md b/docs/notebook-contract.md index e1170f7..9b4f0ce 100644 --- a/docs/notebook-contract.md +++ b/docs/notebook-contract.md @@ -15,12 +15,23 @@ File utili: - CLEAN: `__clean.parquet`, `manifest.json`, `metadata.json` - MART: `.parquet`, `manifest.json`, `metadata.json` +Ruoli dei file: + +- `metadata.json`: payload ricco del layer. Contiene input, output, `config_hash` e campi specifici del layer. +- `manifest.json`: summary stabile del layer. Punta a metadata e validation e riassume `ok/errors_count/warnings_count`. +- run record in `data/_runs/...`: stato del run (`run_id`, layer, validations, status), utile per `status` e `resume`. +- `inspect paths --json`: helper read-only per notebook e script locali; restituisce i path assoluti utili del runtime, incluso `latest_run`. + Per evitare duplicazione di path logic nei notebook: - leggi `dataset.yml` - usa `toolkit inspect paths --config dataset.yml --year --json` - poi apri parquet, metadata, manifest, validation e run record dai path restituiti +Nota pratica: + +- `inspect paths` restituisce path assoluti della macchina locale: e' pensato per notebook e script nello stesso ambiente, non come formato portabile tra macchine diverse. + Regola pratica: - il toolkit produce diff --git a/docs/runtime-boundaries.md b/docs/runtime-boundaries.md new file mode 100644 index 0000000..a3c64f5 --- /dev/null +++ b/docs/runtime-boundaries.md @@ -0,0 +1,39 @@ +# Runtime Boundaries + +Questa nota chiarisce quali parti del package `toolkit/` fanno davvero parte del runtime principale e quali no. + +## Core Runtime + +Queste aree definiscono il contratto stabile del toolkit: + +- `toolkit.raw` +- `toolkit.clean` +- `toolkit.mart` +- `toolkit.cli` per `run`, `validate`, `status`, `inspect` +- `toolkit.core` per config, path, metadata, run tracking e validation + +Sono le superfici che i repo dataset e il `project-template` dovrebbero considerare centrali. + +## Advanced Tooling + +Queste aree restano supportate, ma non fanno parte del percorso canonico: + +- `toolkit.profile` +- `toolkit.cli.cmd_resume` +- `toolkit.cli.cmd_profile` +- esecuzione parziale `run raw|clean|mart` + +Servono per recovery, diagnostica e casi sporchi, non come baseline per i repo nuovi. + +## Compatibility Only + +La compatibilita' mantenuta dal toolkit riguarda soprattutto config legacy, alias documentati e alcune superfici CLI storiche. + +Non va trattata come parte del contratto stabile per i repo nuovi. + +## Builtin Sources + +Le sorgenti builtin supportate dal runtime canonico sono: + +- `local_file` +- `http_file` diff --git a/pytest.ini b/pytest.ini index a635c5c..848bc8f 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,6 @@ [pytest] pythonpath = . +markers = + core: release-critical tests for the public contract and canonical workflow + advanced: supported but non-happy-path or secondary engine behavior + compat: compatibility and legacy-shim coverage diff --git a/scripts/usage_matrix.py b/scripts/usage_matrix.py index 6322828..17e7b2a 100644 --- a/scripts/usage_matrix.py +++ b/scripts/usage_matrix.py @@ -5,8 +5,6 @@ TARGETS = { - "toolkit.core.validators": "toolkit.core.validators", - "toolkit.core.validation_summary": "toolkit.core.validation_summary", "dataset.test.yml": "dataset.test.yml", "clean_ispra_test.sql": "clean_ispra_test.sql", } diff --git a/smoke/README.md b/smoke/README.md index 2b93223..86d1b5f 100644 --- a/smoke/README.md +++ b/smoke/README.md @@ -8,13 +8,14 @@ Progetti inclusi: - `smoke/local_file_csv`: `local_file` completamente offline - `smoke/zip_http_csv`: `http_file` + extractor ZIP (`unzip_first_csv`) contro server locale - `smoke/bdap_http_csv`: `http_file` contro CSV pubblico BDAP +- `smoke/finanze_http_zip_2023`: `http_file` contro ZIP pubblico reale, best-effort Ogni progetto include: - `dataset.yml` minimo - `sql/clean.sql` - `sql/mart/mart_ok.sql` -- `README.md` con i comandi `toolkit run raw/profile/clean/mart/status` +- `README.md` con i comandi del caso smoke, incluso un `--dry-run --strict-config` iniziale Prerequisito: diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..d8b8e67 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +import pytest + + +CORE_TESTS = { + "test_cli_all_commands.py", + "test_cli_inspect_paths.py", + "test_cli_path_contract.py", + "test_cli_resume.py", + "test_cli_status.py", + "test_config.py", + "test_metadata_hash.py", + "test_paths.py", + "test_project_example_e2e.py", + "test_run_context.py", + "test_run_dry_run.py", + "test_run_validation_gate.py", + "test_smoke_tiny_e2e.py", + "test_validate_layers.py", + "test_validate_rules.py", +} + +ADVANCED_TESTS = { + "test_artifacts_policy.py", + "test_clean_csv_columns.py", + "test_clean_duckdb_read.py", + "test_clean_input_selection.py", + "test_extractors.py", + "test_logging_context.py", + "test_profile_sniff.py", + "test_raw_ext_inference.py", + "test_raw_profile_hints.py", + "test_registry.py", +} + +COMPAT_TESTS = { + "test_deprecated_shims.py", +} + + +def pytest_collection_modifyitems(items: list[pytest.Item]) -> None: + for item in items: + name = item.path.name + if name in CORE_TESTS: + item.add_marker(pytest.mark.core) + elif name in ADVANCED_TESTS: + item.add_marker(pytest.mark.advanced) + elif name in COMPAT_TESTS: + item.add_marker(pytest.mark.compat) diff --git a/tests/test_clean_input_selection.py b/tests/test_clean_input_selection.py index 56aa595..4fca53f 100644 --- a/tests/test_clean_input_selection.py +++ b/tests/test_clean_input_selection.py @@ -9,7 +9,7 @@ from toolkit.clean.input_selection import list_raw_candidates, select_inputs from toolkit.clean.run import run_clean -from toolkit.core.manifest import write_manifest +from toolkit.core.manifest import write_raw_manifest class _NoopLogger: @@ -139,7 +139,7 @@ def test_run_clean_uses_manifest_primary(tmp_path: Path, monkeypatch): selected_file.write_text("a\n1\n", encoding="utf-8") other_file = raw_dir / "other.csv" other_file.write_text("a\n2\n", encoding="utf-8") - write_manifest( + write_raw_manifest( raw_dir, { "dataset": "demo", @@ -342,7 +342,7 @@ def test_clean_manifest_points_missing_file_falls_back_and_warns(tmp_path: Path, raw_dir = tmp_path / "data" / "raw" / "demo" / "2024" _write_csv(raw_dir / "small.csv", "a\n1\n") large_file = _write_csv(raw_dir / "large.csv", "a\n" + ("1\n" * 20)) - write_manifest( + write_raw_manifest( raw_dir, { "dataset": "demo", diff --git a/tests/test_config.py b/tests/test_config.py index 8e820b8..defa021 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -774,47 +774,6 @@ def test_load_config_model_rejects_unknown_section_keys_in_strict_mode( """.strip(), "raw.extractor.args", ), - ( - """ -dataset: - name: demo - years: [2022] -raw: {} -clean: - derive: [] -mart: {} -""".strip(), - "clean.derive", - ), - ( - """ -dataset: - name: demo - years: [2022] -raw: {} -clean: - derive: - total: "value * 2" -mart: {} -""".strip(), - "clean.derive.total", - ), - ( - """ -dataset: - name: demo - years: [2022] -raw: {} -clean: - mapping: - totale: - from: valore - type: float - parse: "number_it" -mart: {} -""".strip(), - "clean.mapping.totale.parse", - ), ], ) def test_load_config_model_rejects_wrong_shape_for_typed_subsections( diff --git a/tests/test_deprecated_shims.py b/tests/test_deprecated_shims.py deleted file mode 100644 index 1269021..0000000 --- a/tests/test_deprecated_shims.py +++ /dev/null @@ -1,28 +0,0 @@ -from __future__ import annotations - -import importlib -import sys - -import pytest - - -@pytest.mark.parametrize( - ("module_name", "message"), - [ - ( - "toolkit.core.validators", - "Deprecated import path 'toolkit.core.validators'; use 'toolkit.core.validation' instead; " - "will be removed in v0.5.", - ), - ( - "toolkit.core.validation_summary", - "Deprecated import path 'toolkit.core.validation_summary'; use 'toolkit.core.validation' instead; " - "will be removed in v0.5.", - ), - ], -) -def test_importing_compatibility_shim_emits_deprecation_warning(module_name: str, message: str) -> None: - sys.modules.pop(module_name, None) - - with pytest.warns(DeprecationWarning, match=message): - importlib.import_module(module_name) diff --git a/tests/test_project_example_e2e.py b/tests/test_project_example_e2e.py index 1b5c56d..f3dc4df 100644 --- a/tests/test_project_example_e2e.py +++ b/tests/test_project_example_e2e.py @@ -105,6 +105,12 @@ def test_project_example_golden_path(tmp_path: Path, monkeypatch): assert (raw_dir / raw_manifest["primary_output_file"]).exists() assert raw_manifest["sources"] assert raw_manifest["outputs"] + assert raw_meta["profile_hints"]["file_used"] == raw_manifest["primary_output_file"] + assert raw_meta["profile_hints"]["encoding_suggested"] == "utf-8" + assert raw_meta["profile_hints"]["delim_suggested"] == ";" + assert raw_meta["profile_hints"]["columns_preview"] + assert raw_meta["profile_hints"]["columns_preview"][0] == "Regione" + assert any("Provincia" in column for column in raw_meta["profile_hints"]["columns_preview"]) assert clean_meta["input_files"] == [Path(raw_manifest["primary_output_file"]).name] assert clean_meta["read_source_used"] in {"strict", "robust", "parquet"} assert isinstance(clean_meta["read_params_used"], dict) diff --git a/tests/test_raw_ext_inference.py b/tests/test_raw_ext_inference.py index b3ef258..d8eb3d2 100644 --- a/tests/test_raw_ext_inference.py +++ b/tests/test_raw_ext_inference.py @@ -1,6 +1,6 @@ from pathlib import Path -from toolkit.core.manifest import read_manifest +from toolkit.core.manifest import read_raw_manifest from toolkit.raw.run import _infer_ext, run_raw @@ -92,7 +92,7 @@ def _fake_fetch_payload(_stype: str, _client: dict, _formatted_args: dict): run_raw("demo", 2024, str(tmp_path), raw_cfg, _NoopLogger(), run_id="run-123") out_dir = tmp_path / "data" / "raw" / "demo" / "2024" - manifest = read_manifest(out_dir) + manifest = read_raw_manifest(out_dir) assert manifest is not None assert manifest["dataset"] == "demo" assert manifest["year"] == 2024 @@ -122,7 +122,7 @@ def _fake_fetch_payload(_stype: str, _client: dict, _formatted_args: dict): run_raw("demo", 2024, str(tmp_path), raw_cfg, _NoopLogger(), run_id="run-2") out_dir = tmp_path / "data" / "raw" / "demo" / "2024" - manifest = read_manifest(out_dir) + manifest = read_raw_manifest(out_dir) assert manifest is not None assert manifest["run_id"] == "run-2" assert manifest["primary_output_file"] == "file_1.csv" @@ -151,7 +151,7 @@ def _fake_fetch_payload(_stype: str, _client: dict, _formatted_args: dict): run_raw("demo", 2024, str(tmp_path), raw_cfg, _NoopLogger(), run_id="run-2") out_dir = tmp_path / "data" / "raw" / "demo" / "2024" - manifest = read_manifest(out_dir) + manifest = read_raw_manifest(out_dir) assert manifest is not None assert manifest["run_id"] == "run-2" assert manifest["primary_output_file"] == "file.csv" @@ -184,7 +184,7 @@ def _fake_fetch_payload(_stype: str, _client: dict, formatted_args: dict): run_raw("demo", 2024, str(tmp_path), raw_cfg, _NoopLogger(), run_id="run-123") out_dir = tmp_path / "data" / "raw" / "demo" / "2024" - manifest = read_manifest(out_dir) + manifest = read_raw_manifest(out_dir) assert manifest is not None assert manifest["sources"] == [ {"name": "alpha", "output_file": "a.csv"}, diff --git a/tests/test_raw_profile_hints.py b/tests/test_raw_profile_hints.py new file mode 100644 index 0000000..ec4d568 --- /dev/null +++ b/tests/test_raw_profile_hints.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from pathlib import Path + +from toolkit.profile.raw import build_profile_hints + + +def test_build_profile_hints_for_standard_csv(tmp_path: Path) -> None: + csv_path = tmp_path / "sample.csv" + csv_path.write_text("id,name,value\n1,Alice,10\n2,Bob,20\n", encoding="utf-8") + + hints = build_profile_hints(csv_path) + + assert hints["file_used"] == "sample.csv" + assert hints["encoding_suggested"] == "utf-8" + assert hints["delim_suggested"] == "," + assert hints["decimal_suggested"] is None + assert hints["skip_suggested"] == 0 + assert hints["header_line"] == "id,name,value" + assert hints["columns_preview"] == ["id", "name", "value"] + assert hints["warnings"] == [] + + +def test_build_profile_hints_detects_preamble_line(tmp_path: Path) -> None: + csv_path = tmp_path / "preamble.csv" + csv_path.write_text( + "Applied filters: year is 2024\nid;name;value\n1;Alice;10\n", + encoding="utf-8", + ) + + hints = build_profile_hints(csv_path) + + assert hints["delim_suggested"] == ";" + assert hints["skip_suggested"] == 1 + assert hints["header_line"] == "id;name;value" + assert hints["columns_preview"] == ["id", "name", "value"] + assert any("header_preamble_detected" in warning for warning in hints["warnings"]) diff --git a/tests/test_registry.py b/tests/test_registry.py index 3f2659a..757e230 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,9 +1,6 @@ -import importlib -import logging - import pytest -from toolkit.core.registry import PluginRegistrationError, Registry, register_builtin_plugins +from toolkit.core.registry import Registry, register_builtin_plugins def test_registry_register_and_create(): @@ -26,44 +23,3 @@ def test_register_builtin_plugins_registers_present_plugins(): plugins = r.list_plugins() assert "http_file" in plugins assert "local_file" in plugins - - -def test_register_builtin_plugins_warns_for_optional_missing_plugin_in_non_strict( - monkeypatch, caplog, capsys -): - r = Registry() - real_import = importlib.import_module - - def _fake_import(name: str, package=None): - if name == "toolkit.plugins.html_table": - raise ImportError("missing optional dependency") - return real_import(name, package) - - monkeypatch.setattr("toolkit.core.registry.importlib.import_module", _fake_import) - - with caplog.at_level(logging.WARNING, logger="toolkit.core.registry"): - register_builtin_plugins(registry_obj=r, strict=False) - - captured = caplog.text + capsys.readouterr().out - assert "DCLPLUGIN001" in captured - assert "html_table" in captured - assert "html_table" not in r.list_plugins() - assert "http_file" in r.list_plugins() - - -def test_register_builtin_plugins_errors_for_optional_missing_plugin_in_strict(monkeypatch): - r = Registry() - real_import = importlib.import_module - - def _fake_import(name: str, package=None): - if name == "toolkit.plugins.html_table": - raise ImportError("missing optional dependency") - return real_import(name, package) - - monkeypatch.setattr("toolkit.core.registry.importlib.import_module", _fake_import) - - with pytest.raises(PluginRegistrationError) as exc: - register_builtin_plugins(registry_obj=r, strict=True) - - assert "DCLPLUGIN001" in str(exc.value) - assert "html_table" in str(exc.value) diff --git a/toolkit/clean/duckdb_read.py b/toolkit/clean/duckdb_read.py index 3dada79..12fbd6a 100644 --- a/toolkit/clean/duckdb_read.py +++ b/toolkit/clean/duckdb_read.py @@ -28,6 +28,40 @@ class ReadInfo: params_used: dict[str, Any] +def _read_source_mode(clean_cfg: dict[str, Any], logger=None) -> tuple[str, dict[str, Any]]: + raw_read_cfg = clean_cfg.get("read") + read_source = clean_cfg.get("read_source") + explicit_cfg: dict[str, Any] = {} + + if raw_read_cfg is None: + pass + elif isinstance(raw_read_cfg, str): + if logger is not None: + logger.warning("clean.read scalar form is deprecated; use clean.read.source") + read_source = raw_read_cfg + elif isinstance(raw_read_cfg, dict): + explicit_cfg = dict(raw_read_cfg) + nested_source = explicit_cfg.pop("source", None) + if nested_source is not None: + read_source = nested_source + else: + raise ValueError("clean.read must be either a mapping (dict) or one of: auto, config_only") + + normalized_source = str(read_source or "auto") + if normalized_source not in READ_SOURCE_MODES: + raise ValueError("clean.read source must be one of: auto, config_only") + + return normalized_source, explicit_cfg + + +def _split_read_cfg(explicit_cfg: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]: + selection_cfg = dict(explicit_cfg) + relation_overrides = { + key: value for key, value in explicit_cfg.items() if key not in READ_SELECTION_KEYS + } + return selection_cfg, relation_overrides + + def load_suggested_read(raw_year_dir: Path) -> dict[str, Any] | None: suggested_path = raw_year_dir / "_profile" / "suggested_read.yml" if not suggested_path.exists(): @@ -57,34 +91,8 @@ def resolve_clean_read_cfg( clean_cfg: dict[str, Any], logger=None, ) -> tuple[dict[str, Any], dict[str, Any], list[str]]: - raw_read_cfg = clean_cfg.get("read") - read_source = clean_cfg.get("read_source") - explicit_cfg: dict[str, Any] = {} - - if raw_read_cfg is None: - pass - elif isinstance(raw_read_cfg, str): - if logger is not None: - logger.warning( - "clean.read scalar form is deprecated; use clean.read.source" - ) - read_source = raw_read_cfg - elif isinstance(raw_read_cfg, dict): - explicit_cfg = dict(raw_read_cfg) - nested_source = explicit_cfg.pop("source", None) - if nested_source is not None: - read_source = nested_source - else: - raise ValueError("clean.read must be either a mapping (dict) or one of: auto, config_only") - - normalized_source = str(read_source or "auto") - if normalized_source not in READ_SOURCE_MODES: - raise ValueError("clean.read source must be one of: auto, config_only") - - selection_cfg = dict(explicit_cfg) - relation_overrides = { - key: value for key, value in explicit_cfg.items() if key not in READ_SELECTION_KEYS - } + normalized_source, explicit_cfg = _read_source_mode(clean_cfg, logger) + selection_cfg, relation_overrides = _split_read_cfg(explicit_cfg) suggested_cfg = load_suggested_read(raw_year_dir) filtered_suggested = filter_suggested_read(suggested_cfg) @@ -114,6 +122,8 @@ def sql_path(p: Path) -> str: def quote_list(paths: list[Path]) -> str: return ", ".join([f"'{sql_path(p)}'" for p in paths]) + + def csv_trim_projection(columns: dict[str, str]) -> str: exprs: list[str] = [] for name, dtype in columns.items(): @@ -247,64 +257,91 @@ def _execute_csv_read( return params_used -def read_raw_to_relation( +def _execute_parquet_read( con: duckdb.DuckDBPyConnection, input_files: list[Path], - params: dict[str, Any] | None, - mode: str, - logger, ) -> ReadInfo: - read_cfg = normalize_read_cfg(params) - if not input_files: - raise FileNotFoundError( - "No supported input files found for CLEAN " - f"(expected one of: {sorted(SUPPORTED_INPUT_EXTS)})." + if len(input_files) == 1: + con.execute( + f"CREATE VIEW raw_input AS " + f"SELECT * FROM read_parquet('{sql_path(input_files[0])}');" ) + else: + paths = quote_list(input_files) + con.execute( + f"CREATE VIEW raw_input AS " + f"SELECT * FROM read_parquet([{paths}]);" + ) + return ReadInfo(source="parquet", params_used={}) - exts = {p.suffix.lower() for p in input_files} - if exts <= {".parquet"}: - if len(input_files) == 1: - con.execute( - f"CREATE VIEW raw_input AS " - f"SELECT * FROM read_parquet('{sql_path(input_files[0])}');" - ) - else: - paths = quote_list(input_files) - con.execute( - f"CREATE VIEW raw_input AS " - f"SELECT * FROM read_parquet([{paths}]);" - ) - logger.info("read_csv params used: source=parquet params={}") - return ReadInfo(source="parquet", params_used={}) +def _validate_read_mode(mode: str) -> str: normalized_mode = str(mode or "fallback") if normalized_mode not in {"strict", "fallback", "robust"}: raise ValueError("clean.read_mode must be one of: strict, fallback, robust") + return normalized_mode - try: - if normalized_mode == "robust": - robust_cfg = robust_preset(read_cfg) - params_used = _execute_csv_read(con, input_files, robust_cfg) - logger.info( - "read_csv params used: source=robust params=%s", - json.dumps(params_used, ensure_ascii=False, sort_keys=True), - ) - return ReadInfo(source="robust", params_used=params_used) - params_used = _execute_csv_read(con, input_files, read_cfg) - logger.info( - "read_csv params used: source=strict params=%s", - json.dumps(params_used, ensure_ascii=False, sort_keys=True), +def _read_failure_message( + *, + input_file: Path, + read_cfg: dict[str, Any], +) -> str: + return ( + "Failed to read CLEAN CSV input. " + f"selected_input={input_file} " + f"read_cfg={json.dumps(read_cfg, ensure_ascii=False, sort_keys=True)}. " + "Try setting clean.read.columns or clean.read.source, " + "or adjusting quote/escape/comment/ignore_errors" + ) + + +def _execute_csv_mode( + con: duckdb.DuckDBPyConnection, + input_files: list[Path], + read_cfg: dict[str, Any], + *, + source: str, + logger, +) -> ReadInfo: + params_used = _execute_csv_read(con, input_files, read_cfg) + logger.info( + "read_csv params used: source=%s params=%s", + source, + json.dumps(params_used, ensure_ascii=False, sort_keys=True), + ) + return ReadInfo(source=source, params_used=params_used) + + +def _read_csv_relation( + con: duckdb.DuckDBPyConnection, + input_files: list[Path], + read_cfg: dict[str, Any], + *, + mode: str, + logger, +) -> ReadInfo: + if mode == "robust": + return _execute_csv_mode( + con, + input_files, + robust_preset(read_cfg), + source="robust", + logger=logger, + ) + + try: + return _execute_csv_mode( + con, + input_files, + read_cfg, + source="strict", + logger=logger, ) - return ReadInfo(source="strict", params_used=params_used) except Exception as exc: - if normalized_mode == "strict": + if mode == "strict": raise ValueError( - "Failed to read CLEAN CSV input. " - f"selected_input={input_files[0]} " - f"read_cfg={json.dumps(read_cfg, ensure_ascii=False, sort_keys=True)}. " - "Try setting clean.read.columns or clean.read.source, " - "or adjusting quote/escape/comment/ignore_errors" + _read_failure_message(input_file=input_files[0], read_cfg=read_cfg) ) from exc short_msg = f"{type(exc).__name__}: {exc}" @@ -315,17 +352,44 @@ def read_raw_to_relation( ) robust_cfg = robust_preset(read_cfg) try: - params_used = _execute_csv_read(con, input_files, robust_cfg) - logger.info( - "read_csv params used: source=robust params=%s", - json.dumps(params_used, ensure_ascii=False, sort_keys=True), + return _execute_csv_mode( + con, + input_files, + robust_cfg, + source="robust", + logger=logger, ) - return ReadInfo(source="robust", params_used=params_used) except Exception as robust_exc: raise ValueError( - "Failed to read CLEAN CSV input. " - f"selected_input={input_files[0]} " - f"read_cfg={json.dumps(robust_cfg, ensure_ascii=False, sort_keys=True)}. " - "Try setting clean.read.columns or clean.read.source, " - "or adjusting quote/escape/comment/ignore_errors" + _read_failure_message(input_file=input_files[0], read_cfg=robust_cfg) ) from robust_exc + + +def read_raw_to_relation( + con: duckdb.DuckDBPyConnection, + input_files: list[Path], + params: dict[str, Any] | None, + mode: str, + logger, +) -> ReadInfo: + read_cfg = normalize_read_cfg(params) + if not input_files: + raise FileNotFoundError( + "No supported input files found for CLEAN " + f"(expected one of: {sorted(SUPPORTED_INPUT_EXTS)})." + ) + + exts = {p.suffix.lower() for p in input_files} + if exts <= {".parquet"}: + info = _execute_parquet_read(con, input_files) + logger.info("read_csv params used: source=parquet params={}") + return info + + normalized_mode = _validate_read_mode(mode) + return _read_csv_relation( + con, + input_files, + read_cfg, + mode=normalized_mode, + logger=logger, + ) diff --git a/toolkit/clean/generator.py b/toolkit/clean/generator.py deleted file mode 100644 index 4204736..0000000 --- a/toolkit/clean/generator.py +++ /dev/null @@ -1,189 +0,0 @@ -from __future__ import annotations - -from typing import Any, Dict, Optional - -ALLOWED_TYPES = {"int", "integer", "float", "double", "str", "string", "date"} - - -def sql_ident(name: str) -> str: - return f'"{name}"' - - -def apply_nullify(expr: str, nullify: Optional[list[str]]) -> str: - if not nullify: - return expr - toks = ", ".join([f"'{t}'" for t in nullify]) - return f"CASE WHEN TRIM(CAST({expr} AS VARCHAR)) IN ({toks}) THEN NULL ELSE {expr} END" - - -def apply_replace(expr: str, replace: Optional[dict[str, str]]) -> str: - if not replace: - return expr - out = expr - for k, v in replace.items(): - out = f"REPLACE({out}, '{k}', '{v}')" - return out - - -def apply_normalize(expr: str, normalize: Optional[list[str]]) -> str: - if not normalize: - return expr - out = expr - for op in normalize: - if op == "trim": - out = f"TRIM({out})" - elif op == "upper": - out = f"UPPER({out})" - elif op == "lower": - out = f"LOWER({out})" - elif op == "title": - out = f"INITCAP({out})" - elif op == "collapse_spaces": - out = f"REGEXP_REPLACE({out}, '\\s+', ' ')" - elif op == "remove_accents": - # Placeholder: no-op for now (future: UDF) - out = out - else: - raise ValueError(f"Unknown normalize op: {op}") - return out - - -def _parse_number_it(expr: str) -> str: - # "1.234,56" -> 1234.56 - return f"REPLACE(REPLACE({expr}, '.', ''), ',', '.')" - - -def _parse_percent_it(expr: str) -> str: - # "12,3%" -> 12.3 - return _parse_number_it(f"REPLACE({expr}, '%', '')") - - -def apply_parse(expr: str, parse: Optional[dict[str, Any]]) -> str: - if not parse: - return expr - kind = parse.get("kind") - if kind == "number_it": - return _parse_number_it(expr) - if kind == "percent_it": - return _parse_percent_it(expr) - raise ValueError(f"Unknown parse kind: {kind}") - - -def apply_cast(expr: str, target_type: str) -> str: - t = target_type.lower() - if t in ("int", "integer"): - return f"CAST({expr} AS INTEGER)" - if t in ("float", "double"): - return f"CAST({expr} AS DOUBLE)" - if t in ("str", "string"): - return f"CAST({expr} AS VARCHAR)" - if t == "date": - return f"CAST({expr} AS DATE)" - raise ValueError(f"Unknown target type: {target_type}") - - -def _validate_mapping(mapping: Dict[str, Dict[str, Any]]) -> None: - if not isinstance(mapping, dict) or not mapping: - raise ValueError("clean.mapping must be a non-empty dict") - - for out_col, spec in mapping.items(): - if not isinstance(out_col, str) or not out_col.strip(): - raise ValueError("Mapping keys (output columns) must be non-empty strings") - if not isinstance(spec, dict): - raise ValueError(f"Mapping for '{out_col}' must be a dict") - - src = spec.get("from") - if not isinstance(src, str) or not src.strip(): - raise ValueError(f"Mapping for '{out_col}' missing/invalid 'from'") - - t = str(spec.get("type", "str")).lower() - if t not in ALLOWED_TYPES: - raise ValueError( - f"Mapping for '{out_col}' has invalid type '{t}' (allowed: {sorted(ALLOWED_TYPES)})" - ) - - if (norm := spec.get("normalize")) is not None and not isinstance(norm, list): - raise ValueError(f"normalize for '{out_col}' must be a list") - - if (nullify := spec.get("nullify")) is not None and not isinstance(nullify, list): - raise ValueError(f"nullify for '{out_col}' must be a list") - - if (replace := spec.get("replace")) is not None and not isinstance(replace, dict): - raise ValueError(f"replace for '{out_col}' must be a dict") - - if (parse := spec.get("parse")) is not None and not isinstance(parse, dict): - raise ValueError(f"parse for '{out_col}' must be a dict") - - -def _field_expr(out_col: str, spec: Dict[str, Any]) -> str: - src = spec["from"] - expr = sql_ident(src) - - # replace -> nullify - expr = apply_replace(expr, spec.get("replace")) - expr = apply_nullify(expr, spec.get("nullify")) - - t = str(spec.get("type", "str")).lower() - - # Strings: cast + normalize - if t in ("str", "string"): - expr = apply_cast(expr, "str") - expr = apply_normalize(expr, spec.get("normalize")) - expr = apply_cast(expr, "str") - return f"{expr} AS {out_col}" - - # Non-strings: parse from trimmed string - expr = f"TRIM(CAST({expr} AS VARCHAR))" - expr = apply_parse(expr, spec.get("parse")) - expr = apply_cast(expr, t) - return f"{expr} AS {out_col}" - - -def generate_clean_sql( - dataset: str, - year: int, - mapping: Dict[str, Dict[str, Any]], - derive: Optional[Dict[str, Dict[str, Any]]] = None, -) -> str: - """ - Generate a CLEAN SQL query (NO trailing semicolon) that reads from view `raw_input`. - """ - _validate_mapping(mapping) - - lines: list[str] = [] - lines.append("-- AUTO-GENERATED BY toolkit.clean.generator") - lines.append(f"-- dataset: {dataset}") - lines.append(f"-- year: {year}") - lines.append("-- source view: raw_input") - lines.append("") - - lines.append("WITH src AS (") - lines.append(" SELECT * FROM raw_input") - lines.append("), mapped AS (") - lines.append(" SELECT") - - fields = [] - for out_col, spec in mapping.items(): - fields.append(" " + _field_expr(out_col, spec)) - lines.append(",\n".join(fields)) - lines.append(" FROM src") - lines.append(")") - - if derive: - lines.append(", derived AS (") - lines.append(" SELECT") - lines.append(" *,") - derived_cols = [] - for k, spec in derive.items(): - expr = spec.get("expr") - if not isinstance(expr, str) or not expr.strip(): - raise ValueError(f"derive.{k} missing/invalid expr") - derived_cols.append(f" ({expr}) AS {k}") - lines.append(",\n".join(derived_cols)) - lines.append(" FROM mapped") - lines.append(")") - lines.append("SELECT * FROM derived") - else: - lines.append("SELECT * FROM mapped") - - return "\n".join(lines) + "\n" diff --git a/toolkit/clean/input_selection.py b/toolkit/clean/input_selection.py index b8c0ae0..9ec0316 100644 --- a/toolkit/clean/input_selection.py +++ b/toolkit/clean/input_selection.py @@ -3,7 +3,7 @@ import json from pathlib import Path -from toolkit.core.manifest import read_manifest +from toolkit.core.manifest import read_raw_manifest from toolkit.core.paths import from_root_relative, layer_year_dir, resolve_root from toolkit.core.run_context import get_run_dir, list_runs @@ -31,12 +31,16 @@ def list_input_files(raw_dir: Path, glob: str = "*") -> list[Path]: ) +def _sorted_unique_paths(paths: list[Path]) -> list[Path]: + return sorted(set(paths), key=lambda item: item.name.lower()) + + def _match_patterns(paths: list[Path], patterns: list[str]) -> list[Path]: matched: list[Path] = [] for path in paths: if any(path.match(pattern) or path.name == pattern for pattern in patterns): matched.append(path) - return sorted(set(matched), key=lambda item: item.name.lower()) + return _sorted_unique_paths(matched) def _metadata_candidates(raw_dir: Path) -> list[Path]: @@ -54,14 +58,36 @@ def _metadata_candidates(raw_dir: Path) -> list[Path]: candidates: list[Path] = [] for name in file_names: candidate = raw_dir / name - if ( - candidate.exists() - and candidate.is_file() - and is_supported_input_file(candidate) - and candidate.stat().st_size > 0 - ): + if _is_usable_input_file(candidate): candidates.append(candidate) - return sorted(set(candidates), key=lambda item: item.name.lower()) + return _sorted_unique_paths(candidates) + + +def _is_usable_input_file(path: Path) -> bool: + return ( + path.exists() + and path.is_file() + and is_supported_input_file(path) + and path.stat().st_size > 0 + ) + + +def _run_record_paths(root: str | None, dataset: str, year: int) -> list[Path]: + return list_runs(get_run_dir(resolve_root(root), dataset, year)) + + +def _raw_layer_succeeded(run_path: Path) -> bool: + record = json.loads(run_path.read_text(encoding="utf-8")) + raw_layer = (record.get("layers") or {}).get("raw") or {} + return raw_layer.get("status") == "SUCCESS" + + +def _latest_raw_success_exists(root: str | None, dataset: str, year: int) -> bool: + runs = _run_record_paths(root, dataset, year) + for run_path in sorted(runs, key=lambda item: item.stat().st_mtime, reverse=True): + if _raw_layer_succeeded(run_path): + return True + return False def list_raw_candidates( @@ -81,19 +107,7 @@ def list_raw_candidates( if not prefer_from_raw_run: return candidates - runs = list_runs(get_run_dir(resolve_root(root), dataset, year)) - if not runs: - return candidates - - latest_raw_success = False - for run_path in sorted(runs, key=lambda item: item.stat().st_mtime, reverse=True): - record = json.loads(run_path.read_text(encoding="utf-8")) - raw_layer = (record.get("layers") or {}).get("raw") or {} - if raw_layer.get("status") == "SUCCESS": - latest_raw_success = True - break - - if not latest_raw_success: + if not _latest_raw_success_exists(root, dataset, year): return candidates metadata_based = _metadata_candidates(raw_dir) @@ -104,6 +118,26 @@ def list_raw_candidates( return filtered or candidates +def _manifest_primary_input(raw_year_dir: Path) -> tuple[Path | None, str | None]: + manifest = read_raw_manifest(raw_year_dir) + if not manifest: + return None, "CLEAN RAW manifest missing, using legacy selection." + + primary_output_file = manifest.get("primary_output_file") + if not isinstance(primary_output_file, str) or not primary_output_file.strip(): + return None, "CLEAN RAW manifest missing primary_output_file; using legacy selection." + + manifest_path = from_root_relative(primary_output_file, raw_year_dir) + if _is_usable_input_file(manifest_path): + return manifest_path, None + + return ( + None, + "CLEAN RAW manifest primary_output_file is missing or invalid: " + f"{primary_output_file}; using legacy selection.", + ) + + def select_raw_input( raw_year_dir: Path, logger, @@ -131,28 +165,11 @@ def select_raw_input( prefer_from_raw_run=prefer_from_raw_run, ) - manifest = read_manifest(raw_year_dir) - if manifest: - primary_output_file = manifest.get("primary_output_file") - if isinstance(primary_output_file, str) and primary_output_file.strip(): - manifest_path = from_root_relative(primary_output_file, raw_year_dir) - if ( - manifest_path.exists() - and manifest_path.is_file() - and is_supported_input_file(manifest_path) - and manifest_path.stat().st_size > 0 - ): - return [manifest_path] - logger.warning( - "CLEAN RAW manifest primary_output_file is missing or invalid: %s; using legacy selection.", - primary_output_file, - ) - else: - logger.warning( - "CLEAN RAW manifest missing primary_output_file; using legacy selection." - ) - else: - logger.warning("CLEAN RAW manifest missing, using legacy selection.") + manifest_primary, manifest_warning = _manifest_primary_input(raw_year_dir) + if manifest_primary is not None: + return [manifest_primary] + if manifest_warning is not None: + logger.warning(manifest_warning) selected = select_inputs( selected_candidates, @@ -196,12 +213,10 @@ def select_inputs( return selected if mode == "latest": - latest = max(candidates, key=lambda item: (item.stat().st_mtime, item.name.lower())) - return [latest] + return [max(candidates, key=lambda item: (item.stat().st_mtime, item.name.lower()))] if mode == "largest": - largest = max(candidates, key=lambda item: (item.stat().st_size, item.name.lower())) - return [largest] + return [max(candidates, key=lambda item: (item.stat().st_size, item.name.lower()))] if mode == "all": return candidates diff --git a/toolkit/clean/run.py b/toolkit/clean/run.py index cc13afc..5ebbaf4 100644 --- a/toolkit/clean/run.py +++ b/toolkit/clean/run.py @@ -13,7 +13,7 @@ ) from toolkit.clean.input_selection import select_raw_input from toolkit.core.artifacts import ARTIFACT_POLICY_DEBUG, resolve_artifact_policy, should_write -from toolkit.core.metadata import config_hash_for_year, file_record, write_manifest, write_metadata +from toolkit.core.metadata import config_hash_for_year, file_record, write_layer_manifest, write_metadata from toolkit.core.paths import layer_year_dir, resolve_root, to_root_relative from toolkit.core.template import render_template @@ -35,72 +35,48 @@ def _resolve_sql_path(sql_ref: str | Path, *, base_dir: Path | None) -> Path: return base_dir / path -def _run_sql( - input_files: list[Path], - sql_query: str, - output_path: Path, +def _load_clean_sql( + clean_cfg: dict[str, Any], *, - read_cfg: dict[str, Any] | None = None, - read_mode: str = "fallback", - logger=None, -) -> tuple[str, dict[str, Any]]: - con = duckdb.connect(":memory:") - try: - read_info = read_raw_to_relation(con, input_files, read_cfg, read_mode, logger) - con.execute(f"CREATE TABLE clean_out AS {sql_query}") - output_path.parent.mkdir(parents=True, exist_ok=True) - con.execute( - f"COPY clean_out TO '{sql_path(output_path)}' (FORMAT PARQUET);" - ) - return read_info.source, read_info.params_used - finally: - con.close() - - -def run_clean( dataset: str, year: int, - root: str | None, - clean_cfg: dict, - logger, - *, - base_dir: Path | None = None, - output_cfg: dict[str, Any] | None = None, -): - policy = resolve_artifact_policy(output_cfg) - root_dir = resolve_root(root) - raw_dir = layer_year_dir(root, "raw", dataset, year) - out_dir = layer_year_dir(root, "clean", dataset, year) - out_dir.mkdir(parents=True, exist_ok=True) - - if not raw_dir.exists(): - raise FileNotFoundError(f"RAW dir not found: {raw_dir}. Run: toolkit run raw -c dataset.yml") - - read_mode = str(clean_cfg.get("read_mode", "fallback")) - read_cfg, relation_read_cfg, read_params_source = resolve_clean_read_cfg(raw_dir, clean_cfg, logger) - selection_mode = read_cfg.get("mode") - glob_pattern = read_cfg.get("glob", "*") - prefer_from_raw_run = bool(read_cfg.get("prefer_from_raw_run", True)) - allow_ambiguous = bool(read_cfg.get("allow_ambiguous", False)) - - sql_rel = clean_cfg.get("sql") - if not sql_rel: + base_dir: Path | None, +) -> tuple[Path, str, dict[str, Any]]: + sql_ref = clean_cfg.get("sql") + if not sql_ref: raise ValueError("clean.sql missing in dataset.yml (expected: clean: { sql: 'sql/clean.sql' })") - sql_path_obj = _resolve_sql_path(sql_rel, base_dir=base_dir) + sql_path_obj = _resolve_sql_path(sql_ref, base_dir=base_dir) if not sql_path_obj.exists(): raise FileNotFoundError(f"CLEAN SQL file not found: {sql_path_obj}") - sql = sql_path_obj.read_text(encoding="utf-8") template_ctx = {"year": year, "dataset": dataset} - sql = render_template(sql, template_ctx) + sql = render_template(sql_path_obj.read_text(encoding="utf-8"), template_ctx) + return sql_path_obj, sql, template_ctx + + +def _write_rendered_sql( + out_dir: Path, + sql: str, + *, + policy: str, + output_cfg: dict[str, Any] | None, +) -> Path | None: + if not should_write("clean", "rendered_sql", policy, {"output": output_cfg or {}}): + return None + + run_dir = out_dir / "_run" + run_dir.mkdir(parents=True, exist_ok=True) + rendered_sql_path = run_dir / "clean_rendered.sql" + rendered_sql_path.write_text(sql, encoding="utf-8") + return rendered_sql_path - rendered_sql_path: Path | None = None - if should_write("clean", "rendered_sql", policy, {"output": output_cfg or {}}): - run_dir = out_dir / "_run" - run_dir.mkdir(parents=True, exist_ok=True) - rendered_sql_path = run_dir / "clean_rendered.sql" - rendered_sql_path.write_text(sql, encoding="utf-8") + +def _selection_params(read_cfg: dict[str, Any], logger) -> tuple[str, str, bool, bool]: + selection_mode = read_cfg.get("mode") + glob_pattern = read_cfg.get("glob", "*") + prefer_from_raw_run = bool(read_cfg.get("prefer_from_raw_run", True)) + allow_ambiguous = bool(read_cfg.get("allow_ambiguous", False)) if selection_mode is None and read_cfg.get("include") is not None: selection_mode = "explicit" @@ -112,16 +88,32 @@ def run_clean( ) selection_mode = "largest" + return str(selection_mode), glob_pattern, prefer_from_raw_run, allow_ambiguous + + +def _select_clean_inputs( + raw_dir: Path, + *, + logger, + mode: str, + root: str | None, + dataset: str, + year: int, + glob: str, + prefer_from_raw_run: bool, + include, + allow_ambiguous: bool, +) -> list[Path]: input_files = select_raw_input( raw_dir, logger, - mode=str(selection_mode), + mode=mode, root=root, dataset=dataset, year=year, - glob=glob_pattern, + glob=glob, prefer_from_raw_run=prefer_from_raw_run, - include=read_cfg.get("include"), + include=include, allow_ambiguous=allow_ambiguous, ) if not input_files: @@ -130,28 +122,38 @@ def run_clean( f"Expected one of: {sorted(SUPPORTED_INPUT_EXTS)}" ) - if len(input_files) > 1 and selection_mode != "all": + if len(input_files) > 1 and mode != "all": raise ValueError( "CLEAN input selection returned multiple files for a single-file mode. " "Use clean.read.mode=all to read multiple inputs." ) if len(input_files) == 1: - logger.info(f"CLEAN selected RAW input -> {input_files[0]}") + logger.info("CLEAN selected RAW input -> %s", input_files[0]) else: logger.info("CLEAN selected RAW inputs -> %s", [str(path) for path in input_files]) - output_path = out_dir / f"{dataset}_{year}_clean.parquet" - read_source_used, read_params_used = _run_sql( - input_files, - sql, - output_path, - read_cfg=relation_read_cfg, - read_mode=read_mode, - logger=logger, - ) + return input_files - outputs = [file_record(output_path)] + +def _clean_metadata_payload( + *, + dataset: str, + year: int, + base_dir: Path | None, + root_dir: Path, + clean_cfg: dict[str, Any], + sql_path_obj: Path, + rendered_sql_path: Path | None, + template_ctx: dict[str, Any], + read_mode: str, + read_params_source: list[str], + read_source_used: str, + read_params_used: dict[str, Any], + input_files: list[Path], + outputs: list[dict[str, Any]], + policy: str, +) -> dict[str, Any]: metadata_payload = { "layer": "clean", "dataset": dataset, @@ -175,11 +177,114 @@ def run_clean( "sql_rendered_absolute": str(rendered_sql_path.resolve()) if rendered_sql_path else None, "output_root_absolute": str(root_dir.resolve()), } + return metadata_payload + + +def _run_sql( + input_files: list[Path], + sql_query: str, + output_path: Path, + *, + read_cfg: dict[str, Any] | None = None, + read_mode: str = "fallback", + logger=None, +) -> tuple[str, dict[str, Any]]: + con = duckdb.connect(":memory:") + try: + read_info = read_raw_to_relation(con, input_files, read_cfg, read_mode, logger) + con.execute(f"CREATE TABLE clean_out AS {sql_query}") + output_path.parent.mkdir(parents=True, exist_ok=True) + con.execute( + f"COPY clean_out TO '{sql_path(output_path)}' (FORMAT PARQUET);" + ) + return read_info.source, read_info.params_used + finally: + con.close() + + +def run_clean( + dataset: str, + year: int, + root: str | None, + clean_cfg: dict, + logger, + *, + base_dir: Path | None = None, + output_cfg: dict[str, Any] | None = None, +): + policy = resolve_artifact_policy(output_cfg) + root_dir = resolve_root(root) + raw_dir = layer_year_dir(root, "raw", dataset, year) + out_dir = layer_year_dir(root, "clean", dataset, year) + out_dir.mkdir(parents=True, exist_ok=True) + + if not raw_dir.exists(): + raise FileNotFoundError(f"RAW dir not found: {raw_dir}. Run: toolkit run raw -c dataset.yml") + + read_mode = str(clean_cfg.get("read_mode", "fallback")) + read_cfg, relation_read_cfg, read_params_source = resolve_clean_read_cfg(raw_dir, clean_cfg, logger) + selection_mode, glob_pattern, prefer_from_raw_run, allow_ambiguous = _selection_params( + read_cfg, + logger, + ) + sql_path_obj, sql, template_ctx = _load_clean_sql( + clean_cfg, + dataset=dataset, + year=year, + base_dir=base_dir, + ) + rendered_sql_path = _write_rendered_sql( + out_dir, + sql, + policy=policy, + output_cfg=output_cfg, + ) + input_files = _select_clean_inputs( + raw_dir, + logger=logger, + mode=selection_mode, + root=root, + dataset=dataset, + year=year, + glob=glob_pattern, + prefer_from_raw_run=prefer_from_raw_run, + include=read_cfg.get("include"), + allow_ambiguous=allow_ambiguous, + ) + + output_path = out_dir / f"{dataset}_{year}_clean.parquet" + read_source_used, read_params_used = _run_sql( + input_files, + sql, + output_path, + read_cfg=relation_read_cfg, + read_mode=read_mode, + logger=logger, + ) + + outputs = [file_record(output_path)] + metadata_payload = _clean_metadata_payload( + dataset=dataset, + year=year, + base_dir=base_dir, + root_dir=root_dir, + clean_cfg=clean_cfg, + sql_path_obj=sql_path_obj, + rendered_sql_path=rendered_sql_path, + template_ctx=template_ctx, + read_mode=read_mode, + read_params_source=read_params_source, + read_source_used=read_source_used, + read_params_used=read_params_used, + input_files=input_files, + outputs=outputs, + policy=policy, + ) metadata_path = write_metadata( out_dir, metadata_payload, ) - write_manifest( + write_layer_manifest( out_dir, metadata_path=metadata_path.name, validation_path="_validate/clean_validation.json", diff --git a/toolkit/clean/validate.py b/toolkit/clean/validate.py index 7e5f25a..05db349 100644 --- a/toolkit/clean/validate.py +++ b/toolkit/clean/validate.py @@ -7,7 +7,7 @@ import duckdb from toolkit.core.config_models import CleanValidationSpec, RangeRuleConfig -from toolkit.core.metadata import write_manifest +from toolkit.core.metadata import write_layer_manifest from toolkit.core.paths import layer_year_dir, to_root_relative from toolkit.core.validation import ( ValidationResult, @@ -22,6 +22,29 @@ def _q_ident(col: str) -> str: return '"' + col.replace('"', '""') + '"' +def _clean_validation_spec( + *, + required: list[str] | None = None, + primary_key: list[str] | None = None, + not_null: list[str] | None = None, + ranges: dict[str, RangeRuleConfig | dict[str, float]] | None = None, + max_null_pct: dict[str, float] | None = None, + min_rows: int | None = None, +) -> CleanValidationSpec: + return CleanValidationSpec.model_validate( + { + "required_columns": required, + "validate": { + "primary_key": primary_key, + "not_null": not_null, + "ranges": ranges or {}, + "max_null_pct": max_null_pct or {}, + "min_rows": min_rows, + }, + } + ) + + def validate_clean( parquet_path: str | Path, required: list[str] | None = None, @@ -47,17 +70,13 @@ def validate_clean( - max_null_pct: {"col": 0.05} (5% max NULLs) - min_rows: minimum row count allowed """ - spec = CleanValidationSpec.model_validate( - { - "required_columns": required, - "validate": { - "primary_key": primary_key, - "not_null": not_null, - "ranges": ranges or {}, - "max_null_pct": max_null_pct or {}, - "min_rows": min_rows, - }, - } + spec = _clean_validation_spec( + required=required, + primary_key=primary_key, + not_null=not_null, + ranges=ranges, + max_null_pct=max_null_pct, + min_rows=min_rows, ) required = spec.required_columns rules = spec.validate @@ -81,91 +100,85 @@ def validate_clean( ) con = duckdb.connect(":memory:") - con.execute(f"CREATE VIEW t AS SELECT * FROM read_parquet('{p.as_posix()}')") - - # Columns - cols = [r[0] for r in con.execute("DESCRIBE t").fetchall()] - - # Required columns - required_result = required_columns_check(cols, required) - errors.extend(required_result.errors) - - # Row count - row_count = int(con.execute("SELECT COUNT(*) FROM t").fetchone()[0]) - if row_count == 0: - errors.append("CLEAN parquet has 0 rows") - if min_rows is not None and row_count < min_rows: - errors.append(f"CLEAN row_count too small: {row_count} < {min_rows}") - - # Not-null checks - for c in not_null: - if c not in cols: - warnings.append(f"Not-null rule column missing in data: '{c}'") - continue - qc = _q_ident(c) - nnull = int(con.execute(f"SELECT COUNT(*) FROM t WHERE {qc} IS NULL").fetchone()[0]) - if nnull > 0: - errors.append(f"Column '{c}' has NULLs: {nnull}") - - # Null percentage thresholds - if row_count > 0: - for c, thr in max_null_pct.items(): + try: + con.execute(f"CREATE VIEW t AS SELECT * FROM read_parquet('{p.as_posix()}')") + + cols = [r[0] for r in con.execute("DESCRIBE t").fetchall()] + + required_result = required_columns_check(cols, required) + errors.extend(required_result.errors) + + row_count = int(con.execute("SELECT COUNT(*) FROM t").fetchone()[0]) + if row_count == 0: + errors.append("CLEAN parquet has 0 rows") + if min_rows is not None and row_count < min_rows: + errors.append(f"CLEAN row_count too small: {row_count} < {min_rows}") + + for c in not_null: if c not in cols: - warnings.append(f"Null-pct rule column missing in data: '{c}'") + warnings.append(f"Not-null rule column missing in data: '{c}'") continue qc = _q_ident(c) nnull = int(con.execute(f"SELECT COUNT(*) FROM t WHERE {qc} IS NULL").fetchone()[0]) - pct = nnull / row_count - if pct > thr: - errors.append(f"Column '{c}' null_pct too high: {pct:.3%} > {thr:.3%}") - - # Primary key uniqueness - if primary_key: - if not all(c in cols for c in primary_key): - warnings.append(f"Primary key columns not all present: {primary_key}") - else: - key_expr = ", ".join(_q_ident(c) for c in primary_key) - dup_groups = int( - con.execute( - f""" - SELECT COUNT(*) FROM ( - SELECT {key_expr}, COUNT(*) AS n - FROM t - GROUP BY {key_expr} - HAVING COUNT(*) > 1 - ) d - """ - ).fetchone()[0] - ) - if dup_groups > 0: - errors.append(f"Primary key duplicates found for {primary_key}: groups={dup_groups}") - - # Range checks (violation = below min OR above max) - for c, rule in ranges.items(): - if c not in cols: - warnings.append(f"Range rule column missing in data: '{c}'") - continue - - qc = _q_ident(c) - violations: list[str] = [] - if rule.min is not None: - violations.append(f"{qc} < {rule.min}") - if rule.max is not None: - violations.append(f"{qc} > {rule.max}") - - if not violations: - warnings.append(f"Range rule for '{c}' has no min/max, skipping") - continue - - where = f"{qc} IS NOT NULL AND (" + " OR ".join(violations) + ")" - bad = int(con.execute(f"SELECT COUNT(*) FROM t WHERE {where}").fetchone()[0]) - if bad > 0: - errors.append( - f"Range check failed for '{c}': bad_rows={bad} " - f"rules={{'min': {rule.min}, 'max': {rule.max}}}" - ) - - con.close() + if nnull > 0: + errors.append(f"Column '{c}' has NULLs: {nnull}") + + if row_count > 0: + for c, thr in max_null_pct.items(): + if c not in cols: + warnings.append(f"Null-pct rule column missing in data: '{c}'") + continue + qc = _q_ident(c) + nnull = int(con.execute(f"SELECT COUNT(*) FROM t WHERE {qc} IS NULL").fetchone()[0]) + pct = nnull / row_count + if pct > thr: + errors.append(f"Column '{c}' null_pct too high: {pct:.3%} > {thr:.3%}") + + if primary_key: + if not all(c in cols for c in primary_key): + warnings.append(f"Primary key columns not all present: {primary_key}") + else: + key_expr = ", ".join(_q_ident(c) for c in primary_key) + dup_groups = int( + con.execute( + f""" + SELECT COUNT(*) FROM ( + SELECT {key_expr}, COUNT(*) AS n + FROM t + GROUP BY {key_expr} + HAVING COUNT(*) > 1 + ) d + """ + ).fetchone()[0] + ) + if dup_groups > 0: + errors.append(f"Primary key duplicates found for {primary_key}: groups={dup_groups}") + + for c, rule in ranges.items(): + if c not in cols: + warnings.append(f"Range rule column missing in data: '{c}'") + continue + + qc = _q_ident(c) + violations: list[str] = [] + if rule.min is not None: + violations.append(f"{qc} < {rule.min}") + if rule.max is not None: + violations.append(f"{qc} > {rule.max}") + + if not violations: + warnings.append(f"Range rule for '{c}' has no min/max, skipping") + continue + + where = f"{qc} IS NOT NULL AND (" + " OR ".join(violations) + ")" + bad = int(con.execute(f"SELECT COUNT(*) FROM t WHERE {where}").fetchone()[0]) + if bad > 0: + errors.append( + f"Range check failed for '{c}': bad_rows={bad} " + f"rules={{'min': {rule.min}, 'max': {rule.max}}}" + ) + finally: + con.close() return ValidationResult( ok=len(errors) == 0, @@ -213,7 +226,7 @@ def run_clean_validation(cfg, year: int, logger) -> dict[str, Any]: report = write_validation_json(Path(out_dir) / "_validate" / "clean_validation.json", result) metadata = json.loads((out_dir / "metadata.json").read_text(encoding="utf-8")) - write_manifest( + write_layer_manifest( out_dir, metadata_path="metadata.json", validation_path="_validate/clean_validation.json", diff --git a/toolkit/cli/app.py b/toolkit/cli/app.py index 80b4cc0..4173862 100644 --- a/toolkit/cli/app.py +++ b/toolkit/cli/app.py @@ -7,7 +7,6 @@ from toolkit.cli.cmd_resume import register as register_resume from toolkit.cli.cmd_status import register as register_status from toolkit.cli.cmd_validate import register as register_validate -from toolkit.cli.cmd_gen_sql import register as register_gen_sql from toolkit.cli.cmd_inspect import register as register_inspect app = typer.Typer(no_args_is_help=True, add_completion=False) @@ -18,7 +17,6 @@ register_resume(app) register_status(app) register_validate(app) -register_gen_sql(app) register_inspect(app) def main(): diff --git a/toolkit/cli/cmd_gen_sql.py b/toolkit/cli/cmd_gen_sql.py deleted file mode 100644 index 804c338..0000000 --- a/toolkit/cli/cmd_gen_sql.py +++ /dev/null @@ -1,55 +0,0 @@ -from __future__ import annotations - -from pathlib import Path -from typing import Any - -import typer - -from toolkit.core.config import load_config -from toolkit.core.logging import get_logger -from toolkit.clean.generator import generate_clean_sql - - -def gen_sql( - config: str = typer.Option(..., "--config", "-c", help="Path to dataset.yml"), - year: int | None = typer.Option(None, "--year", "-y", help="Anno specifico (se omesso: primo anno in dataset.yml)"), - out_dir: str | None = typer.Option(None, "--out", help="Root progetto dove creare sql/_generated/clean.sql"), - strict_config: bool = typer.Option(False, "--strict-config", help="Treat deprecated config forms as errors"), -): - """ - Genera uno skeleton CLEAN SQL da clean.mapping (assist, non pipeline). - Scrive: /sql/_generated/clean.sql - """ - strict_config_flag = strict_config if isinstance(strict_config, bool) else False - cfg = load_config(config, strict_config=strict_config_flag) - logger = get_logger() - - clean_cfg: dict[str, Any] = cfg.clean or {} - mapping = clean_cfg.get("mapping") - if not isinstance(mapping, dict) or not mapping: - raise typer.BadParameter("dataset.yml deve includere clean.mapping (non vuota).") - - derive = clean_cfg.get("derive") - if derive is not None and not isinstance(derive, dict): - raise typer.BadParameter("clean.derive deve essere una mappa (dict) oppure omesso.") - - y = int(year) if year is not None else int(list(cfg.years)[0]) - - root = Path(out_dir) if out_dir else cfg.base_dir - target_dir = root / "sql" / "_generated" - target_dir.mkdir(parents=True, exist_ok=True) - target_path = target_dir / "clean.sql" - - sql_text = generate_clean_sql( - dataset=cfg.dataset, - year=y, - mapping=mapping, - derive=derive, - ) - - target_path.write_text(sql_text, encoding="utf-8") - logger.info(f"GEN-SQL -> {target_path}") - - -def register(app: typer.Typer) -> None: - app.command("gen-sql")(gen_sql) diff --git a/toolkit/core/config_models.py b/toolkit/core/config_models.py index 2174d2a..a5eb64f 100644 --- a/toolkit/core/config_models.py +++ b/toolkit/core/config_models.py @@ -312,40 +312,6 @@ def _normalize_lists(cls, value: Any, info) -> list[str]: return ensure_str_list(value, f"clean.validate.{info.field_name}") -class CleanMappingSpec(BaseModel): - model_config = ConfigDict(extra="forbid", populate_by_name=True) - - from_: str = Field(alias="from") - type: Literal["int", "integer", "float", "double", "str", "string", "date"] = "str" - normalize: list[str] | None = None - nullify: list[str] | None = None - replace: dict[str, str] | None = None - parse: MappingParseConfig | None = None - - -class MappingParseConfig(BaseModel): - model_config = ConfigDict(extra="allow") - - kind: str | None = None - locale: str | None = None - options: dict[str, Any] | None = None - - @field_validator("options", mode="before") - @classmethod - def _validate_options(cls, value: Any) -> dict[str, Any] | None: - if value is None: - return None - if not isinstance(value, dict): - raise ValueError("clean.mapping.*.parse.options must be a dict") - return dict(value) - - -class CleanDeriveFieldConfig(BaseModel): - model_config = ConfigDict(extra="forbid") - - expr: str - - class CleanConfig(BaseModel): model_config = ConfigDict(extra="allow", populate_by_name=True) @@ -353,8 +319,6 @@ class CleanConfig(BaseModel): read_mode: Literal["strict", "fallback", "robust"] = "fallback" read_source: Literal["auto", "config_only"] | None = None read: CleanReadConfig | None = None - mapping: dict[str, CleanMappingSpec] | None = None - derive: dict[str, CleanDeriveFieldConfig] | None = None required_columns: list[str] = Field(default_factory=list) validate_config: CleanValidateConfig = Field( default_factory=CleanValidateConfig, diff --git a/toolkit/core/manifest.py b/toolkit/core/manifest.py index 5320c4a..699fa26 100644 --- a/toolkit/core/manifest.py +++ b/toolkit/core/manifest.py @@ -6,13 +6,13 @@ from toolkit.core.io import read_json, write_json_atomic -def write_manifest(folder: Path, payload: dict[str, Any]) -> Path: +def write_raw_manifest(folder: Path, payload: dict[str, Any]) -> Path: path = folder / "manifest.json" write_json_atomic(path, payload) return path -def read_manifest(folder: Path) -> dict[str, Any] | None: +def read_raw_manifest(folder: Path) -> dict[str, Any] | None: path = folder / "manifest.json" if not path.exists(): return None diff --git a/toolkit/core/metadata.py b/toolkit/core/metadata.py index d00c136..84a78d1 100644 --- a/toolkit/core/metadata.py +++ b/toolkit/core/metadata.py @@ -60,7 +60,7 @@ def write_metadata(folder: Path, data: dict[str, Any], filename: str = "metadata return out -def write_manifest( +def write_layer_manifest( folder: Path, *, metadata_path: str, diff --git a/toolkit/core/registry.py b/toolkit/core/registry.py index 937597a..d8bcad5 100644 --- a/toolkit/core/registry.py +++ b/toolkit/core/registry.py @@ -58,20 +58,6 @@ def clear(self) -> None: "optional": False, "factory": lambda cls: (lambda **client: cls()), }, - { - "name": "api_json_paged", - "module": "toolkit.plugins.api_json_paged", - "class_name": "ApiJsonPagedSource", - "optional": True, - "factory": lambda cls: (lambda **client: cls(**client)), - }, - { - "name": "html_table", - "module": "toolkit.plugins.html_table", - "class_name": "HtmlTableSource", - "optional": True, - "factory": lambda cls: (lambda **client: cls(**client)), - }, ) @@ -91,15 +77,6 @@ def register_builtin_plugins( plugin_class = getattr(module, spec["class_name"]) target.register(spec["name"], spec["factory"](plugin_class)) except Exception as exc: - if spec["optional"]: - message = ( - f"DCLPLUGIN001 optional plugin '{spec['name']}' unavailable: {exc}. " - f"Install/repair dependencies or disable the plugin." - ) - logger.warning(message) - if strict: - raise PluginRegistrationError(message) from exc - continue raise PluginRegistrationError( f"Required built-in plugin '{spec['name']}' failed to register: {exc}" ) from exc diff --git a/toolkit/core/template.py b/toolkit/core/template.py index f1a9a84..0623c1e 100644 --- a/toolkit/core/template.py +++ b/toolkit/core/template.py @@ -5,11 +5,12 @@ def render_template(text: str, ctx: dict[str, Any]) -> str: """ - Templating MINIMO e deterministico. - Supporta placeholder stile: {year}, {dataset}. - Non รจ Jinja: niente logica, niente espressioni. + Minimal deterministic templating used by the runtime contract. + + Supports only plain placeholders such as `{year}` and `{dataset}`. + This is intentionally not a general templating engine. """ out = text for k, v in ctx.items(): out = out.replace("{" + k + "}", str(v)) - return out \ No newline at end of file + return out diff --git a/toolkit/core/validation_summary.py b/toolkit/core/validation_summary.py deleted file mode 100644 index bba4cbc..0000000 --- a/toolkit/core/validation_summary.py +++ /dev/null @@ -1,14 +0,0 @@ -from __future__ import annotations - -import warnings - -from toolkit.core.validation import build_validation_summary - -warnings.warn( - "Deprecated import path 'toolkit.core.validation_summary'; use 'toolkit.core.validation' instead; " - "will be removed in v0.5.", - DeprecationWarning, - stacklevel=2, -) - -__all__ = ["build_validation_summary"] diff --git a/toolkit/core/validators.py b/toolkit/core/validators.py deleted file mode 100644 index 91c46f2..0000000 --- a/toolkit/core/validators.py +++ /dev/null @@ -1,15 +0,0 @@ -from __future__ import annotations - -import warnings - -from toolkit.core.validation import ValidationResult -from toolkit.core.validation import required_columns_check as required_columns - -warnings.warn( - "Deprecated import path 'toolkit.core.validators'; use 'toolkit.core.validation' instead; " - "will be removed in v0.5.", - DeprecationWarning, - stacklevel=2, -) - -__all__ = ["ValidationResult", "required_columns"] diff --git a/toolkit/mart/run.py b/toolkit/mart/run.py index acd50ec..be17aa4 100644 --- a/toolkit/mart/run.py +++ b/toolkit/mart/run.py @@ -6,7 +6,7 @@ import duckdb from toolkit.core.artifacts import ARTIFACT_POLICY_DEBUG, resolve_artifact_policy, should_write -from toolkit.core.metadata import config_hash_for_year, file_record, write_manifest, write_metadata +from toolkit.core.metadata import config_hash_for_year, file_record, write_layer_manifest, write_metadata from toolkit.core.paths import layer_year_dir, resolve_root, to_root_relative from toolkit.core.template import render_template @@ -149,7 +149,7 @@ def run_mart( mart_dir, metadata_payload, ) - write_manifest( + write_layer_manifest( mart_dir, metadata_path=metadata_path.name, validation_path="_validate/mart_validation.json", diff --git a/toolkit/mart/validate.py b/toolkit/mart/validate.py index 876376d..0666ee7 100644 --- a/toolkit/mart/validate.py +++ b/toolkit/mart/validate.py @@ -7,7 +7,7 @@ import duckdb from toolkit.core.config_models import MartTableRuleConfig, MartValidationSpec -from toolkit.core.metadata import write_manifest +from toolkit.core.metadata import write_layer_manifest from toolkit.core.paths import layer_year_dir, to_root_relative from toolkit.core.validation import ( ValidationResult, @@ -221,7 +221,7 @@ def run_mart_validation(cfg, year: int, logger) -> dict[str, Any]: report = write_validation_json(Path(mart_dir) / "_validate" / "mart_validation.json", result) metadata = json.loads((mart_dir / "metadata.json").read_text(encoding="utf-8")) - write_manifest( + write_layer_manifest( mart_dir, metadata_path="metadata.json", validation_path="_validate/mart_validation.json", diff --git a/toolkit/plugins/__init__.py b/toolkit/plugins/__init__.py index b68b398..f6fc45e 100644 --- a/toolkit/plugins/__init__.py +++ b/toolkit/plugins/__init__.py @@ -1,4 +1,9 @@ """Built-in plugin modules. Plugin registration is explicit via `toolkit.core.registry.register_builtin_plugins()`. + +Builtin stable sources exposed by the default runtime: + +- `local_file` +- `http_file` """ diff --git a/toolkit/plugins/api_json_paged.py b/toolkit/plugins/api_json_paged.py deleted file mode 100644 index d1b39b5..0000000 --- a/toolkit/plugins/api_json_paged.py +++ /dev/null @@ -1,114 +0,0 @@ -from __future__ import annotations - -import json -import time -from typing import Any - -import requests - -from toolkit.core.exceptions import DownloadError - - -class ApiJsonPagedSource: - """ - Fetch paginated JSON from an API and return CSV-bytes (header included). - - Assumptions (simple & flexible): - - Each page returns either: - A) a list of dicts, OR - B) a dict containing a list under `items_path` (default "items") - - Pagination increases by integer page: page=1..N (customizable) - """ - - def __init__( - self, - timeout: int = 60, - retries: int = 2, - sleep_seconds: float = 0.0, - user_agent: str | None = None, - ): - self.timeout = timeout - self.retries = retries - self.sleep_seconds = sleep_seconds - self.user_agent = user_agent or "dataciviclab-toolkit/0.1" - - def fetch( - self, - base_url: str, - params: dict[str, Any] | None = None, - *, - page_param: str = "page", - start_page: int = 1, - items_path: str = "items", - max_pages: int = 10_000, - ) -> bytes: - headers = {"User-Agent": self.user_agent} - params = dict(params or {}) - all_items: list[dict[str, Any]] = [] - - page = start_page - last_err: Exception | None = None - - while page <= max_pages: - p = dict(params) - p[page_param] = page - - ok = False - for _ in range(self.retries + 1): - try: - r = requests.get(base_url, params=p, headers=headers, timeout=self.timeout) - r.raise_for_status() - payload = r.json() - - if isinstance(payload, list): - items = payload - elif isinstance(payload, dict): - items = payload.get(items_path, []) - else: - items = [] - - if not items: - ok = True - page = max_pages + 1 # break outer - break - - # keep only dict rows - all_items.extend([x for x in items if isinstance(x, dict)]) - - ok = True - break - except Exception as e: - last_err = e - time.sleep(0.25) - - if not ok: - raise DownloadError(str(last_err) if last_err else f"Failed to fetch {base_url}") - - page += 1 - if self.sleep_seconds: - time.sleep(self.sleep_seconds) - - # Convert to CSV bytes (simple, deterministic) - if not all_items: - return b"" - - # stable column order: union of keys, sorted - cols = sorted({k for row in all_items for k in row.keys()}) - - def esc(v: Any) -> str: - if v is None: - return "" - if isinstance(v, (dict, list)): - v = json.dumps(v, ensure_ascii=False) - s = str(v) - # CSV escaping - if any(ch in s for ch in [",", "\n", "\r", '"']): - s = '"' + s.replace('"', '""') + '"' - return s - - lines = [] - lines.append(",".join(cols)) - for row in all_items: - lines.append(",".join(esc(row.get(c)) for c in cols)) - - return ("\n".join(lines) + "\n").encode("utf-8") diff --git a/toolkit/plugins/html_table.py b/toolkit/plugins/html_table.py deleted file mode 100644 index 0fa1e56..0000000 --- a/toolkit/plugins/html_table.py +++ /dev/null @@ -1,33 +0,0 @@ -from __future__ import annotations - -import pandas as pd -import requests - -from toolkit.core.exceptions import DownloadError - - -class HtmlTableSource: - """Fetch an HTML page, parse the first table (or by index), return CSV bytes.""" - - def __init__(self, timeout: int = 60, retries: int = 2, user_agent: str | None = None): - self.timeout = timeout - self.retries = retries - self.user_agent = user_agent or "dataciviclab-toolkit/0.1" - - def fetch(self, url: str, *, table_index: int = 0) -> bytes: - headers = {"User-Agent": self.user_agent} - last_err: Exception | None = None - - for _ in range(self.retries + 1): - try: - r = requests.get(url, headers=headers, timeout=self.timeout) - r.raise_for_status() - tables = pd.read_html(r.text) - if not tables: - return b"" - idx = max(0, min(table_index, len(tables) - 1)) - return tables[idx].to_csv(index=False).encode("utf-8") - except Exception as e: - last_err = e - - raise DownloadError(str(last_err) if last_err else f"Failed to parse HTML tables from {url}") diff --git a/toolkit/profile/__init__.py b/toolkit/profile/__init__.py index 8921398..2cfcac3 100644 --- a/toolkit/profile/__init__.py +++ b/toolkit/profile/__init__.py @@ -1 +1,5 @@ -# toolkit/profile/__init__.py \ No newline at end of file +"""Advanced profiling helpers. + +`toolkit.profile` is supported tooling for diagnostics and format inference on RAW +inputs. It is not a first-class pipeline layer like `raw`, `clean`, or `mart`. +""" diff --git a/toolkit/profile/raw.py b/toolkit/profile/raw.py index 1daadfd..d42b89e 100644 --- a/toolkit/profile/raw.py +++ b/toolkit/profile/raw.py @@ -20,6 +20,10 @@ def _safe_mkdir(p: Path) -> None: p.mkdir(parents=True, exist_ok=True) +def _raw_files(raw_dir: Path) -> list[Path]: + return sorted([p for p in raw_dir.glob("*") if p.is_file()]) + + def _try_decode(filepath: Path, enc: str) -> Optional[str]: try: with filepath.open("r", encoding=enc, errors="strict") as f: @@ -84,6 +88,54 @@ def suggest_skip(sample_text: str, delim: Optional[str]) -> int: return 0 +def _sniff_file_profile(file0: Path) -> tuple[str, str, Optional[str], Optional[str], int]: + enc, txt = sniff_encoding(file0) + delim = sniff_delim(txt) + dec = sniff_decimal(txt) + skip = suggest_skip(txt, delim) + return enc, txt, delim, dec, skip + + +def _preview_columns(header_line: str | None, delim: str | None) -> list[str]: + if not header_line or not delim: + return [] + parts = [segment.strip() for segment in header_line.split(delim)] + return [_normalize_colname(part) for part in parts if part.strip()] + + +def build_profile_hints(filepath: Path) -> Dict[str, Any]: + enc, txt = sniff_encoding(filepath) + delim = sniff_delim(txt) + dec = sniff_decimal(txt) + skip = suggest_skip(txt, delim) + warnings: list[str] = [] + + if skip: + warnings.append( + "header_preamble_detected: first non-empty line looks like a title row, consider skip: 1" + ) + + header_line: str | None = None + try: + with filepath.open("r", encoding=enc, errors="replace") as f: + for _ in range(skip): + f.readline() + header_line = f.readline().rstrip("\n\r") + except Exception as exc: + warnings.append(f"header_read_failed: {type(exc).__name__}: {exc}") + + return { + "file_used": filepath.name, + "encoding_suggested": enc, + "delim_suggested": delim, + "decimal_suggested": dec, + "skip_suggested": skip, + "header_line": header_line, + "columns_preview": _preview_columns(header_line, delim), + "warnings": warnings, + } + + def _build_read_csv_opts(read_cfg: Dict[str, Any]) -> str: opts = ["union_by_name=true"] @@ -202,6 +254,38 @@ def _pick_data_file(files: List[Path]) -> Path: return files[0] +def _effective_profile_read_cfg( + read_cfg: Optional[Dict[str, Any]], + *, + encoding: str, + delim: Optional[str], + decimal: Optional[str], + skip: int, +) -> dict[str, Any]: + effective_read_cfg = dict(read_cfg) if isinstance(read_cfg, dict) else {} + effective_read_cfg.pop("source", None) + if "delim" not in effective_read_cfg and "sep" not in effective_read_cfg and delim: + effective_read_cfg["delim"] = delim + if "encoding" not in effective_read_cfg and encoding: + effective_read_cfg["encoding"] = encoding + if "decimal" not in effective_read_cfg and decimal: + effective_read_cfg["decimal"] = decimal + if "skip" not in effective_read_cfg and skip: + effective_read_cfg["skip"] = skip + effective_read_cfg.setdefault("header", True) + return effective_read_cfg + + +def _read_header_line(file0: Path, *, encoding: str, skip_n: int) -> str | None: + try: + with file0.open("r", encoding=encoding, errors="replace") as f: + for _ in range(skip_n): + f.readline() + return f.readline().rstrip("\n\r") + except Exception: + return None + + def _sample_values(sample_rows: List[Dict[str, Any]], col: str, limit: int = 25) -> List[str]: vals: List[str] = [] for r in sample_rows: @@ -304,6 +388,50 @@ def _build_mapping_suggestions(columns: List[str], sample_rows: List[Dict[str, A return out +def _profile_view( + con: duckdb.DuckDBPyConnection, + file0: Path, + *, + effective_read_cfg: dict[str, Any], +) -> None: + opt_sql = _build_read_csv_opts(effective_read_cfg) + con.execute( + f"CREATE OR REPLACE VIEW v AS " + f"SELECT * FROM read_csv('{sql_str(str(file0))}', {opt_sql});" + ) + + +def _describe_columns(con: duckdb.DuckDBPyConnection) -> tuple[list[str], list[str]]: + cols = con.execute("DESCRIBE v").fetchall() + columns_raw = [r[0] for r in cols] + columns_norm = [_normalize_colname(c) for c in columns_raw] + return columns_raw, columns_norm + + +def _sample_profile_rows( + con: duckdb.DuckDBPyConnection, + columns_raw: list[str], +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + df = con.execute("SELECT * FROM v LIMIT 50").fetchdf() + sample_rows = df.to_dict(orient="records") + + missingness_top: list[dict[str, Any]] = [] + for c in columns_raw[:200]: + n, nmiss = con.execute( + f""" + SELECT + COUNT(*) AS n, + SUM(CASE WHEN "{c}" IS NULL OR TRIM(CAST("{c}" AS VARCHAR)) = '' THEN 1 ELSE 0 END) AS n_missing + FROM v + """ + ).fetchone() + if n: + missingness_top.append({"column": c, "missing_pct": float(nmiss) / float(n) * 100.0}) + + missingness_top = sorted(missingness_top, key=lambda x: -x["missing_pct"])[:25] + return sample_rows, missingness_top + + @dataclass class RawProfile: dataset: str @@ -328,28 +456,19 @@ class RawProfile: def profile_raw(raw_dir: Path, dataset: str, year: int, read_cfg: Optional[Dict[str, Any]] = None) -> RawProfile: - files = sorted([p for p in raw_dir.glob("*") if p.is_file()]) + files = _raw_files(raw_dir) if not files: raise FileNotFoundError(f"No RAW files found in {raw_dir}") file0 = _pick_data_file(files) - - enc, txt = sniff_encoding(file0) - delim = sniff_delim(txt) - dec = sniff_decimal(txt) - skip = suggest_skip(txt, delim) - - effective_read_cfg = dict(read_cfg) if isinstance(read_cfg, dict) else {} - effective_read_cfg.pop("source", None) - if "delim" not in effective_read_cfg and "sep" not in effective_read_cfg and delim: - effective_read_cfg["delim"] = delim - if "encoding" not in effective_read_cfg and enc: - effective_read_cfg["encoding"] = enc - if "decimal" not in effective_read_cfg and dec: - effective_read_cfg["decimal"] = dec - if "skip" not in effective_read_cfg and skip: - effective_read_cfg["skip"] = skip - effective_read_cfg.setdefault("header", True) + enc, txt, delim, dec, skip = _sniff_file_profile(file0) + effective_read_cfg = _effective_profile_read_cfg( + read_cfg, + encoding=enc, + delim=delim, + decimal=dec, + skip=skip, + ) warnings: List[str] = [] header_line: Optional[str] = None @@ -363,55 +482,36 @@ def profile_raw(raw_dir: Path, dataset: str, year: int, read_cfg: Optional[Dict[ if skip: warnings.append("header_preamble_detected: first non-empty line looks like a title row, consider skip: 1") - # header line (respect skip) - try: - skip_n = int(effective_read_cfg.get("skip") or 0) - with file0.open("r", encoding=effective_read_cfg.get("encoding") or enc, errors="replace") as f: - for _ in range(skip_n): - f.readline() - header_line = f.readline().rstrip("\n\r") - except Exception as e: - warnings.append(f"header_read_failed: {type(e).__name__}: {e}") + skip_n = int(effective_read_cfg.get("skip") or 0) + header_line = _read_header_line( + file0, + encoding=effective_read_cfg.get("encoding") or enc, + skip_n=skip_n, + ) + if header_line is None: + warnings.append("header_read_failed: could not read header line") con = duckdb.connect(":memory:") try: - opt_sql = _build_read_csv_opts(effective_read_cfg) try: - con.execute( - f"CREATE OR REPLACE VIEW v AS " - f"SELECT * FROM read_csv('{sql_str(str(file0))}', {opt_sql});" + _profile_view( + con, + file0, + effective_read_cfg=effective_read_cfg, ) except Exception as e: - fallback_cfg = robust_preset(effective_read_cfg) - robust_read_suggested = True warnings.append(f"profile_read_retry: {type(e).__name__}: {e}") - fallback_sql = _build_read_csv_opts(fallback_cfg) - con.execute( - f"CREATE OR REPLACE VIEW v AS " - f"SELECT * FROM read_csv('{sql_str(str(file0))}', {fallback_sql});" + robust_read_suggested = True + fallback_cfg = robust_preset(effective_read_cfg) + fallback_cfg.setdefault("auto_detect", False) + _profile_view( + con, + file0, + effective_read_cfg=fallback_cfg, ) - cols = con.execute("DESCRIBE v").fetchall() - columns_raw = [r[0] for r in cols] - columns_norm = [_normalize_colname(c) for c in columns_raw] - - df = con.execute("SELECT * FROM v LIMIT 50").fetchdf() - sample_rows = df.to_dict(orient="records") - - for c in columns_raw[:200]: - n, nmiss = con.execute( - f""" - SELECT - COUNT(*) AS n, - SUM(CASE WHEN "{c}" IS NULL OR TRIM(CAST("{c}" AS VARCHAR)) = '' THEN 1 ELSE 0 END) AS n_missing - FROM v - """ - ).fetchone() - if n: - missingness_top.append({"column": c, "missing_pct": float(nmiss) / float(n) * 100.0}) - - missingness_top = sorted(missingness_top, key=lambda x: -x["missing_pct"])[:25] - + columns_raw, columns_norm = _describe_columns(con) + sample_rows, missingness_top = _sample_profile_rows(con, columns_raw) mapping_suggestions = _build_mapping_suggestions(columns_raw, sample_rows) except Exception as e: diff --git a/toolkit/raw/run.py b/toolkit/raw/run.py index ce5590a..09a839a 100644 --- a/toolkit/raw/run.py +++ b/toolkit/raw/run.py @@ -5,12 +5,13 @@ from pathlib import Path from urllib.parse import urlparse -from toolkit.core.manifest import write_manifest as write_raw_manifest +from toolkit.core.manifest import write_raw_manifest from toolkit.core.config import parse_bool from toolkit.core.metadata import config_hash_for_year, sha256_bytes, write_metadata from toolkit.core.paths import layer_year_dir, to_root_relative from toolkit.core.registry import register_builtin_plugins, registry from toolkit.core.validation import write_validation_json +from toolkit.profile.raw import build_profile_hints from toolkit.raw.extractors import get_extractor from toolkit.raw.validate import validate_raw_output @@ -245,6 +246,17 @@ def run_raw( ) primary_output_file = _choose_primary_output(manifest_sources, logger) + primary_output_path = out_dir / primary_output_file + profile_hints = None + try: + if primary_output_path.exists() and primary_output_path.suffix.lower() in { + ".csv", + ".tsv", + ".txt", + }: + profile_hints = build_profile_hints(primary_output_path) + except Exception as exc: + logger.warning("RAW profile_hints generation failed: %s: %s", type(exc).__name__, exc) metadata_path = write_metadata( out_dir, @@ -261,6 +273,7 @@ def run_raw( for f in files_written ], "files": files_written, + "profile_hints": profile_hints, }, ) diff --git a/toolkit/raw/validate.py b/toolkit/raw/validate.py index 0b2d469..e2cfb12 100644 --- a/toolkit/raw/validate.py +++ b/toolkit/raw/validate.py @@ -4,7 +4,7 @@ from typing import Any import json -from toolkit.core.manifest import read_manifest, write_manifest +from toolkit.core.manifest import read_raw_manifest, write_raw_manifest from toolkit.core.paths import layer_year_dir from toolkit.core.validation import ValidationResult, build_validation_summary, write_validation_json @@ -106,7 +106,7 @@ def run_raw_validation(root: str | None, dataset: str, year: int, logger) -> dic result = validate_raw_output(out_dir, files) report = write_validation_json(out_dir / "raw_validation.json", result) - existing_manifest = read_manifest(out_dir) or { + existing_manifest = read_raw_manifest(out_dir) or { "dataset": dataset, "year": year, "run_id": str(metadata.get("run_id") or "unknown"), @@ -126,6 +126,6 @@ def run_raw_validation(root: str | None, dataset: str, year: int, logger) -> dic "outputs": metadata.get("outputs", []), } ) - write_manifest(out_dir, existing_manifest) + write_raw_manifest(out_dir, existing_manifest) logger.info(f"VALIDATE RAW -> {report} (ok={result.ok})") return build_validation_summary(result)