Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tests/test_cross_year.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path
import shutil
import json

from toolkit.cli.cmd_run import run as run_cmd

Expand Down Expand Up @@ -45,3 +46,8 @@ def test_cli_run_cross_year_on_project_example(tmp_path: Path, monkeypatch) -> N
assert (cross_dir / "clean_union.parquet").exists()
assert (cross_dir / "metadata.json").exists()
assert (cross_dir / "manifest.json").exists()
assert (cross_dir / "_validate" / "cross_validation.json").exists()

manifest = json.loads((cross_dir / "manifest.json").read_text(encoding="utf-8"))
assert manifest["validation"] == "_validate/cross_validation.json"
assert manifest["summary"]["ok"] is True
39 changes: 39 additions & 0 deletions tests/test_validate_layers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import json
from pathlib import Path
import re
from types import SimpleNamespace

import duckdb

from toolkit.raw.validate import validate_raw_output
from toolkit.clean.validate import validate_clean
from toolkit.cross.validate import run_cross_validation, validate_cross_outputs
from toolkit.mart.validate import validate_mart
from toolkit.core.validation import write_validation_json

Expand Down Expand Up @@ -131,3 +133,40 @@ def test_validate_mart_report_uses_root_relative_dir(tmp_path: Path):
field="dir",
expected="data/mart/demo/2024",
)


def test_validate_cross_outputs_required_tables(tmp_path: Path):
d = tmp_path / "cross"
d.mkdir(parents=True, exist_ok=True)

_write_parquet(d / "foo.parquet", "CREATE TABLE t AS SELECT 1 AS k")

res = validate_cross_outputs(d, required_tables=["foo", "bar"], years=[2022, 2023])
assert res.ok is False
assert any("Missing required CROSS tables" in e for e in res.errors)
assert res.summary["years"] == [2022, 2023]


def test_run_cross_validation_does_not_require_metadata_json(tmp_path: Path):
root = tmp_path / "root"
cross_dir = root / "data" / "cross" / "demo"
cross_dir.mkdir(parents=True, exist_ok=True)
_write_parquet(cross_dir / "foo.parquet", "CREATE TABLE t AS SELECT 1 AS k")

cfg = SimpleNamespace(
root=root,
dataset="demo",
cross_year={"tables": [{"name": "foo", "sql": "sql/cross/foo.sql"}]},
)

summary = run_cross_validation(cfg, [2022, 2023], logger=SimpleNamespace(info=lambda *args, **kwargs: None))

assert summary["passed"] is True
report = cross_dir / "_validate" / "cross_validation.json"
manifest = cross_dir / "manifest.json"
assert report.exists()
assert manifest.exists()

manifest_payload = json.loads(manifest.read_text(encoding="utf-8"))
assert manifest_payload["validation"] == "_validate/cross_validation.json"
assert manifest_payload["summary"]["ok"] is True
5 changes: 5 additions & 0 deletions toolkit/cli/cmd_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from toolkit.clean.run import run_clean
from toolkit.clean.validate import run_clean_validation
from toolkit.cross.run import run_cross_year
from toolkit.cross.validate import run_cross_validation
from toolkit.core.logging import bind_logger, get_logger
from toolkit.core.paths import layer_dataset_dir, layer_year_dir
from toolkit.core.run_context import RunContext
Expand Down Expand Up @@ -154,6 +155,10 @@ def run_cross_year_step(
base_dir=cfg.base_dir,
output_cfg=cfg.output,
)
summary = run_cross_validation(cfg, selected_years, logger)
fail_on_error = bool((cfg.validation or {}).get("fail_on_error", True))
if not summary.get("passed", False) and fail_on_error:
raise ValidationGateError("CROSS_YEAR validation failed")


def run_year(
Expand Down
106 changes: 106 additions & 0 deletions toolkit/cross/validate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from __future__ import annotations

import json
from pathlib import Path
from typing import Any

import duckdb

from toolkit.core.metadata import file_record, write_layer_manifest
from toolkit.core.paths import layer_dataset_dir, to_root_relative
from toolkit.core.validation import ValidationResult, build_validation_summary, write_validation_json


def validate_cross_outputs(
cross_dir: str | Path,
*,
required_tables: list[str] | None = None,
root: str | Path | None = None,
years: list[int] | None = None,
) -> ValidationResult:
required = sorted(set(required_tables or []))
d = Path(cross_dir)
dir_value = to_root_relative(d, Path(root)) if root is not None else str(d)
if not d.exists():
return ValidationResult(
ok=False,
errors=[f"Missing CROSS dir: {d}"],
warnings=[],
summary={
"dir": dir_value,
"years": list(years or []),
"tables": [],
"required_tables": required,
"row_counts": {},
},
)

existing_files = sorted(d.glob("*.parquet"))
existing_tables = sorted(path.stem for path in existing_files)
missing = [table for table in required if table not in existing_tables]

errors: list[str] = []
warnings: list[str] = []
if missing:
errors.append(f"Missing required CROSS tables: {missing}")

con = duckdb.connect(":memory:")
try:
row_counts: dict[str, int] = {}
for path in existing_files:
try:
row_counts[path.stem] = int(
con.execute(f"SELECT COUNT(*) FROM read_parquet('{path.as_posix()}')").fetchone()[0]
)
except Exception as exc:
warnings.append(f"Could not count rows for {path.name}: {exc}")
finally:
con.close()

return ValidationResult(
ok=len(errors) == 0,
errors=errors,
warnings=warnings,
summary={
"dir": dir_value,
"years": list(years or []),
"tables": existing_tables,
"required_tables": required,
"row_counts": row_counts,
},
)


def run_cross_validation(cfg, years: list[int], logger) -> dict[str, Any]:
cross_dir = layer_dataset_dir(cfg.root, "cross", cfg.dataset)
required_tables = [
table.get("name")
for table in (cfg.cross_year or {}).get("tables", [])
if isinstance(table, dict) and table.get("name")
]

result = validate_cross_outputs(
cross_dir,
required_tables=required_tables,
root=cfg.root,
years=years,
)

report = write_validation_json(Path(cross_dir) / "_validate" / "cross_validation.json", result)
metadata_path = Path(cross_dir) / "metadata.json"
if metadata_path.exists():
metadata = json.loads(metadata_path.read_text(encoding="utf-8"))
outputs = metadata.get("outputs", [])
else:
outputs = [file_record(path) for path in sorted(Path(cross_dir).glob("*.parquet"))]
write_layer_manifest(
cross_dir,
metadata_path="metadata.json",
validation_path="_validate/cross_validation.json",
outputs=outputs,
ok=result.ok,
errors_count=len(result.errors),
warnings_count=len(result.warnings),
)
logger.info("VALIDATE CROSS_YEAR -> %s (ok=%s)", report, result.ok)
return build_validation_summary(result)
Loading