Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions tests/test_cli_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,153 @@ def test_status_reports_raw_hints_when_raw_artifacts_exist(tmp_path: Path, monke
assert "delim: ;" in result.output
assert "skip: 1" in result.output
assert "header_preamble_detected" in result.output


def test_status_reports_validation_summary_from_layer_artifacts(tmp_path: Path, monkeypatch) -> None:
project_dir = tmp_path / "project"
config_path = project_dir / "dataset.yml"
project_dir.mkdir()

config_path.write_text(
"""
root: "./out"
dataset:
name: demo_ds
years: [2022]
raw: {}
clean:
sql: "sql/clean.sql"
required_columns: ["id", "value"]
mart:
tables:
- name: mart_ok
sql: "sql/mart/mart_ok.sql"
required_tables: ["mart_ok", "mart_missing"]
cross_year:
tables:
- name: cross_ok
sql: "sql/cross/cross_ok.sql"
""".strip(),
encoding="utf-8",
)

sql_mart_dir = project_dir / "sql" / "mart"
sql_cross_dir = project_dir / "sql" / "cross"
sql_mart_dir.mkdir(parents=True, exist_ok=True)
sql_cross_dir.mkdir(parents=True, exist_ok=True)
(project_dir / "sql" / "clean.sql").write_text("select 1 as value", encoding="utf-8")
(sql_mart_dir / "mart_ok.sql").write_text("select * from clean_input", encoding="utf-8")
(sql_cross_dir / "cross_ok.sql").write_text("select * from clean_input", encoding="utf-8")

clean_dir = project_dir / "out" / "data" / "clean" / "demo_ds" / "2022"
mart_dir = project_dir / "out" / "data" / "mart" / "demo_ds" / "2022"
cross_dir = project_dir / "out" / "data" / "cross" / "demo_ds"
(clean_dir / "_validate").mkdir(parents=True, exist_ok=True)
(mart_dir / "_validate").mkdir(parents=True, exist_ok=True)
(cross_dir / "_validate").mkdir(parents=True, exist_ok=True)

(clean_dir / "demo_ds_2022_clean.parquet").write_text("placeholder", encoding="utf-8")
(cross_dir / "cross_ok.parquet").write_text("placeholder", encoding="utf-8")

(clean_dir / "manifest.json").write_text(
json.dumps(
{
"validation": "_validate/clean_validation.json",
"summary": {"ok": True, "errors_count": 0, "warnings_count": 1},
"outputs": [{"file": "demo_ds_2022_clean.parquet"}],
},
indent=2,
),
encoding="utf-8",
)
(clean_dir / "_validate" / "clean_validation.json").write_text(
json.dumps(
{
"ok": True,
"errors": [],
"warnings": ["header_preamble_detected"],
"summary": {
"required": ["id", "value"],
"columns": ["id"],
},
},
indent=2,
),
encoding="utf-8",
)

(mart_dir / "manifest.json").write_text(
json.dumps(
{
"validation": "_validate/mart_validation.json",
"summary": {"ok": False, "errors_count": 1, "warnings_count": 1},
"outputs": [{"file": "mart_ok.parquet"}],
},
indent=2,
),
encoding="utf-8",
)
(mart_dir / "_validate" / "mart_validation.json").write_text(
json.dumps(
{
"ok": False,
"errors": ["Missing required MART tables: ['mart_missing']"],
"warnings": ["MART table_rules reference tables not declared in mart.tables: ['mart_extra']"],
"summary": {
"required_tables": ["mart_ok", "mart_missing"],
"tables": ["mart_ok"],
"per_table": {},
},
},
indent=2,
),
encoding="utf-8",
)

(cross_dir / "manifest.json").write_text(
json.dumps(
{
"validation": "_validate/cross_validation.json",
"summary": {"ok": True, "errors_count": 0, "warnings_count": 0},
"outputs": [{"file": "cross_ok.parquet"}],
},
indent=2,
),
encoding="utf-8",
)
(cross_dir / "_validate" / "cross_validation.json").write_text(
json.dumps(
{
"ok": True,
"errors": [],
"warnings": [],
"summary": {
"required_tables": [],
"tables": ["cross_ok"],
},
},
indent=2,
),
encoding="utf-8",
)

run_dir = get_run_dir(project_dir / "out", "demo_ds", 2022)
_write_run_record(run_dir / "run-123.json", "run-123", "2026-03-04T10:00:00+00:00", "FAILED")

monkeypatch.chdir(tmp_path)
runner = CliRunner()

result = runner.invoke(
app,
["status", "--dataset", "demo_ds", "--year", "2022", "--latest", "--config", str(config_path)],
)

assert result.exit_code == 0
assert "validation_summary:" in result.output
assert "clean: state=passed warnings=1 errors=0" in result.output
assert "warnings_present: yes" in result.output
assert "missing_columns=value" in result.output
assert "mart: state=failed warnings=1 errors=1" in result.output
assert "missing_tables=mart_missing" in result.output
assert "missing_outputs=mart_ok.parquet" in result.output
assert "cross_year: state=passed warnings=0 errors=0" in result.output
173 changes: 172 additions & 1 deletion toolkit/cli/cmd_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import json
from pathlib import Path
from typing import Any

import typer

from toolkit.core.config import load_config
from toolkit.core.paths import layer_year_dir
from toolkit.core.paths import layer_dataset_dir, layer_year_dir
from toolkit.core.run_context import get_run_dir, latest_run, read_run_record


Expand Down Expand Up @@ -48,6 +49,174 @@ def _raw_hints(root: Path, dataset: str, year: int) -> dict[str, object]:
}


def _layer_artifacts_dir(root: Path, dataset: str, year: int, layer: str) -> Path:
if layer == "cross_year":
return layer_dataset_dir(root, "cross", dataset)
return layer_year_dir(root, layer, dataset, year)


def _validation_counts(
validation_payload: dict[str, Any] | None,
manifest_payload: dict[str, Any] | None,
record_summary: dict[str, Any] | None,
) -> tuple[bool | None, int | None, int | None]:
if validation_payload is not None:
return (
validation_payload.get("ok"),
len(validation_payload.get("errors") or []),
len(validation_payload.get("warnings") or []),
)

manifest_summary = (manifest_payload or {}).get("summary") or {}
if manifest_summary:
return (
manifest_summary.get("ok"),
manifest_summary.get("errors_count"),
manifest_summary.get("warnings_count"),
)

record_summary = record_summary or {}
if record_summary:
return (
record_summary.get("passed"),
record_summary.get("errors_count"),
record_summary.get("warnings_count"),
)

return None, None, None


def _layer_validation_summary(
root: Path,
dataset: str,
year: int,
layer: str,
record: dict[str, Any],
) -> dict[str, Any] | None:
layer_dir = _layer_artifacts_dir(root, dataset, year, layer)
manifest_payload = _read_json(layer_dir / "manifest.json")
validation_rel = (manifest_payload or {}).get("validation")
validation_payload = None
validation_path = None
if isinstance(validation_rel, str) and validation_rel.strip():
validation_path = layer_dir / validation_rel
validation_payload = _read_json(validation_path)

record_summary = (record.get("validations") or {}).get(layer, {})
ok, errors_count, warnings_count = _validation_counts(
validation_payload,
manifest_payload,
record_summary if isinstance(record_summary, dict) else {},
)

has_any_data = any(
[
manifest_payload is not None,
validation_payload is not None,
bool(record_summary),
layer_dir.exists(),
]
)
if not has_any_data:
return None

warnings = []
errors = []
details: list[str] = []
if validation_payload is not None:
warnings = [str(item) for item in (validation_payload.get("warnings") or [])]
errors = [str(item) for item in (validation_payload.get("errors") or [])]

if validation_path is not None and validation_payload is None:
details.append(f"validation_missing={validation_path.name}")

outputs = (manifest_payload or {}).get("outputs") or []
if isinstance(outputs, list):
missing_outputs = []
for entry in outputs:
if not isinstance(entry, dict):
continue
file_name = entry.get("file")
if isinstance(file_name, str) and file_name and not (layer_dir / file_name).exists():
missing_outputs.append(file_name)
if missing_outputs:
details.append(f"missing_outputs={', '.join(missing_outputs)}")

summary = (validation_payload or {}).get("summary") or {}
if layer == "clean":
required = summary.get("required") or []
columns = summary.get("columns") or []
if isinstance(required, list) and isinstance(columns, list):
missing_columns = [column for column in required if column not in set(columns)]
if missing_columns:
details.append(f"missing_columns={', '.join(str(column) for column in missing_columns)}")
if layer in {"mart", "cross_year"}:
required_tables = summary.get("required_tables") or []
tables = summary.get("tables") or []
if isinstance(required_tables, list) and isinstance(tables, list):
missing_tables = [table for table in required_tables if table not in set(tables)]
if missing_tables:
details.append(f"missing_tables={', '.join(str(table) for table in missing_tables)}")

if ok is True:
state = "passed"
elif ok is False:
state = "failed"
elif manifest_payload is not None:
state = "not_validated"
else:
state = "unknown"

return {
"layer": layer,
"state": state,
"warnings_count": warnings_count,
"errors_count": errors_count,
"has_warnings": bool(warnings_count),
"warning_items": warnings,
"error_items": errors,
"details": details,
}


def _print_validation_summary(
root: Path,
dataset: str,
year: int,
record: dict[str, Any],
has_cross_year: bool,
) -> None:
summaries: list[dict[str, Any]] = []
for layer in ("clean", "mart"):
summary = _layer_validation_summary(root, dataset, year, layer, record)
if summary is not None:
summaries.append(summary)

if has_cross_year:
summary = _layer_validation_summary(root, dataset, year, "cross_year", record)
if summary is not None:
summaries.append(summary)

if not summaries:
return

typer.echo("")
typer.echo("validation_summary:")
for summary in summaries:
warnings_count = summary.get("warnings_count")
errors_count = summary.get("errors_count")
typer.echo(
f" {summary['layer']}: "
f"state={summary['state']} "
f"warnings={warnings_count if warnings_count is not None else '?'} "
f"errors={errors_count if errors_count is not None else '?'}"
)
if summary.get("has_warnings"):
typer.echo(" warnings_present: yes")
for detail in summary.get("details") or []:
typer.echo(f" {detail}")


def status(
dataset: str = typer.Option(..., "--dataset", help="Dataset name"),
year: int = typer.Option(..., "--year", help="Dataset year"),
Expand All @@ -66,6 +235,7 @@ def status(
cfg = load_config(config, strict_config=strict_config_flag)
run_dir = get_run_dir(cfg.root, dataset, year)
record = read_run_record(run_dir, run_id) if run_id else latest_run(run_dir)
has_cross_year = bool((cfg.cross_year or {}).get("tables"))

typer.echo(f"dataset: {record.get('dataset')}")
typer.echo(f"year: {record.get('year')}")
Expand Down Expand Up @@ -93,6 +263,7 @@ def status(
typer.echo("layer layer_status validation_passed errors_count warnings_count")
for layer in ("raw", "clean", "mart"):
typer.echo(_layer_row(record, layer))
_print_validation_summary(Path(cfg.root), dataset, year, record, has_cross_year)

if record.get("status") == "FAILED" and record.get("error"):
typer.echo("")
Expand Down
Loading