From 743a1895e6d45b671c2387754c9bd1e5a5418925 Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Wed, 11 Mar 2026 22:38:14 +0000 Subject: [PATCH 1/2] cross: aggiungi validation minima per output multi-anno --- tests/test_cross_year.py | 6 +++ tests/test_validate_layers.py | 13 +++++ toolkit/cli/cmd_run.py | 5 ++ toolkit/cross/validate.py | 95 +++++++++++++++++++++++++++++++++++ 4 files changed, 119 insertions(+) create mode 100644 toolkit/cross/validate.py diff --git a/tests/test_cross_year.py b/tests/test_cross_year.py index b42442c..3eda9f5 100644 --- a/tests/test_cross_year.py +++ b/tests/test_cross_year.py @@ -1,5 +1,6 @@ from pathlib import Path import shutil +import json from toolkit.cli.cmd_run import run as run_cmd @@ -45,3 +46,8 @@ def test_cli_run_cross_year_on_project_example(tmp_path: Path, monkeypatch) -> N assert (cross_dir / "clean_union.parquet").exists() assert (cross_dir / "metadata.json").exists() assert (cross_dir / "manifest.json").exists() + assert (cross_dir / "_validate" / "cross_validation.json").exists() + + manifest = json.loads((cross_dir / "manifest.json").read_text(encoding="utf-8")) + assert manifest["validation"] == "_validate/cross_validation.json" + assert manifest["summary"]["ok"] is True diff --git a/tests/test_validate_layers.py b/tests/test_validate_layers.py index 4792cff..7241eb0 100644 --- a/tests/test_validate_layers.py +++ b/tests/test_validate_layers.py @@ -6,6 +6,7 @@ from toolkit.raw.validate import validate_raw_output from toolkit.clean.validate import validate_clean +from toolkit.cross.validate import validate_cross_outputs from toolkit.mart.validate import validate_mart from toolkit.core.validation import write_validation_json @@ -131,3 +132,15 @@ def test_validate_mart_report_uses_root_relative_dir(tmp_path: Path): field="dir", expected="data/mart/demo/2024", ) + + +def test_validate_cross_outputs_required_tables(tmp_path: Path): + d = tmp_path / "cross" + d.mkdir(parents=True, exist_ok=True) + + _write_parquet(d / "foo.parquet", "CREATE TABLE t AS SELECT 1 AS k") + + res = validate_cross_outputs(d, required_tables=["foo", "bar"], years=[2022, 2023]) + assert res.ok is False + assert any("Missing required CROSS tables" in e for e in res.errors) + assert res.summary["years"] == [2022, 2023] diff --git a/toolkit/cli/cmd_run.py b/toolkit/cli/cmd_run.py index 87caf85..b3b2768 100644 --- a/toolkit/cli/cmd_run.py +++ b/toolkit/cli/cmd_run.py @@ -8,6 +8,7 @@ from toolkit.clean.run import run_clean from toolkit.clean.validate import run_clean_validation from toolkit.cross.run import run_cross_year +from toolkit.cross.validate import run_cross_validation from toolkit.core.logging import bind_logger, get_logger from toolkit.core.paths import layer_dataset_dir, layer_year_dir from toolkit.core.run_context import RunContext @@ -154,6 +155,10 @@ def run_cross_year_step( base_dir=cfg.base_dir, output_cfg=cfg.output, ) + summary = run_cross_validation(cfg, selected_years, logger) + fail_on_error = bool((cfg.validation or {}).get("fail_on_error", True)) + if not summary.get("passed", False) and fail_on_error: + raise ValidationGateError("CROSS_YEAR validation failed") def run_year( diff --git a/toolkit/cross/validate.py b/toolkit/cross/validate.py new file mode 100644 index 0000000..90b8e94 --- /dev/null +++ b/toolkit/cross/validate.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import duckdb + +from toolkit.core.metadata import write_layer_manifest +from toolkit.core.paths import layer_dataset_dir, to_root_relative +from toolkit.core.validation import ValidationResult, build_validation_summary, write_validation_json + + +def validate_cross_outputs( + cross_dir: str | Path, + *, + required_tables: list[str] | None = None, + root: str | Path | None = None, + years: list[int] | None = None, +) -> ValidationResult: + required = sorted(set(required_tables or [])) + d = Path(cross_dir) + dir_value = to_root_relative(d, Path(root)) if root is not None else str(d) + if not d.exists(): + return ValidationResult( + ok=False, + errors=[f"Missing CROSS dir: {d}"], + warnings=[], + summary={"dir": dir_value, "years": list(years or [])}, + ) + + existing_files = sorted(d.glob("*.parquet")) + existing_tables = sorted(path.stem for path in existing_files) + missing = [table for table in required if table not in existing_tables] + + errors: list[str] = [] + warnings: list[str] = [] + if missing: + errors.append(f"Missing required CROSS tables: {missing}") + + con = duckdb.connect(":memory:") + try: + row_counts: dict[str, int] = {} + for path in existing_files: + try: + row_counts[path.stem] = int( + con.execute(f"SELECT COUNT(*) FROM read_parquet('{path.as_posix()}')").fetchone()[0] + ) + except Exception as exc: + warnings.append(f"Could not count rows for {path.name}: {exc}") + finally: + con.close() + + return ValidationResult( + ok=len(errors) == 0, + errors=errors, + warnings=warnings, + summary={ + "dir": dir_value, + "years": list(years or []), + "tables": existing_tables, + "required_tables": required, + "row_counts": row_counts, + }, + ) + + +def run_cross_validation(cfg, years: list[int], logger) -> dict[str, Any]: + cross_dir = layer_dataset_dir(cfg.root, "cross", cfg.dataset) + required_tables = [ + table.get("name") + for table in (cfg.cross_year or {}).get("tables", []) + if isinstance(table, dict) and table.get("name") + ] + + result = validate_cross_outputs( + cross_dir, + required_tables=required_tables, + root=cfg.root, + years=years, + ) + + report = write_validation_json(Path(cross_dir) / "_validate" / "cross_validation.json", result) + metadata = json.loads((cross_dir / "metadata.json").read_text(encoding="utf-8")) + write_layer_manifest( + cross_dir, + metadata_path="metadata.json", + validation_path="_validate/cross_validation.json", + outputs=metadata.get("outputs", []), + ok=result.ok, + errors_count=len(result.errors), + warnings_count=len(result.warnings), + ) + logger.info("VALIDATE CROSS_YEAR -> %s (ok=%s)", report, result.ok) + return build_validation_summary(result) From 5e248c862d155d0752f1cec3f13f7f8453970767 Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Wed, 11 Mar 2026 22:42:09 +0000 Subject: [PATCH 2/2] cross: non richiedere metadata.json per la validation --- tests/test_validate_layers.py | 28 +++++++++++++++++++++++++++- toolkit/cross/validate.py | 19 +++++++++++++++---- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/tests/test_validate_layers.py b/tests/test_validate_layers.py index 7241eb0..1878904 100644 --- a/tests/test_validate_layers.py +++ b/tests/test_validate_layers.py @@ -1,12 +1,13 @@ import json from pathlib import Path import re +from types import SimpleNamespace import duckdb from toolkit.raw.validate import validate_raw_output from toolkit.clean.validate import validate_clean -from toolkit.cross.validate import validate_cross_outputs +from toolkit.cross.validate import run_cross_validation, validate_cross_outputs from toolkit.mart.validate import validate_mart from toolkit.core.validation import write_validation_json @@ -144,3 +145,28 @@ def test_validate_cross_outputs_required_tables(tmp_path: Path): assert res.ok is False assert any("Missing required CROSS tables" in e for e in res.errors) assert res.summary["years"] == [2022, 2023] + + +def test_run_cross_validation_does_not_require_metadata_json(tmp_path: Path): + root = tmp_path / "root" + cross_dir = root / "data" / "cross" / "demo" + cross_dir.mkdir(parents=True, exist_ok=True) + _write_parquet(cross_dir / "foo.parquet", "CREATE TABLE t AS SELECT 1 AS k") + + cfg = SimpleNamespace( + root=root, + dataset="demo", + cross_year={"tables": [{"name": "foo", "sql": "sql/cross/foo.sql"}]}, + ) + + summary = run_cross_validation(cfg, [2022, 2023], logger=SimpleNamespace(info=lambda *args, **kwargs: None)) + + assert summary["passed"] is True + report = cross_dir / "_validate" / "cross_validation.json" + manifest = cross_dir / "manifest.json" + assert report.exists() + assert manifest.exists() + + manifest_payload = json.loads(manifest.read_text(encoding="utf-8")) + assert manifest_payload["validation"] == "_validate/cross_validation.json" + assert manifest_payload["summary"]["ok"] is True diff --git a/toolkit/cross/validate.py b/toolkit/cross/validate.py index 90b8e94..265a62b 100644 --- a/toolkit/cross/validate.py +++ b/toolkit/cross/validate.py @@ -6,7 +6,7 @@ import duckdb -from toolkit.core.metadata import write_layer_manifest +from toolkit.core.metadata import file_record, write_layer_manifest from toolkit.core.paths import layer_dataset_dir, to_root_relative from toolkit.core.validation import ValidationResult, build_validation_summary, write_validation_json @@ -26,7 +26,13 @@ def validate_cross_outputs( ok=False, errors=[f"Missing CROSS dir: {d}"], warnings=[], - summary={"dir": dir_value, "years": list(years or [])}, + summary={ + "dir": dir_value, + "years": list(years or []), + "tables": [], + "required_tables": required, + "row_counts": {}, + }, ) existing_files = sorted(d.glob("*.parquet")) @@ -81,12 +87,17 @@ def run_cross_validation(cfg, years: list[int], logger) -> dict[str, Any]: ) report = write_validation_json(Path(cross_dir) / "_validate" / "cross_validation.json", result) - metadata = json.loads((cross_dir / "metadata.json").read_text(encoding="utf-8")) + metadata_path = Path(cross_dir) / "metadata.json" + if metadata_path.exists(): + metadata = json.loads(metadata_path.read_text(encoding="utf-8")) + outputs = metadata.get("outputs", []) + else: + outputs = [file_record(path) for path in sorted(Path(cross_dir).glob("*.parquet"))] write_layer_manifest( cross_dir, metadata_path="metadata.json", validation_path="_validate/cross_validation.json", - outputs=metadata.get("outputs", []), + outputs=outputs, ok=result.ok, errors_count=len(result.errors), warnings_count=len(result.warnings),