diff --git a/tests/test_cli_status.py b/tests/test_cli_status.py index 67aaf48..fd1b585 100644 --- a/tests/test_cli_status.py +++ b/tests/test_cli_status.py @@ -155,3 +155,153 @@ def test_status_reports_raw_hints_when_raw_artifacts_exist(tmp_path: Path, monke assert "delim: ;" in result.output assert "skip: 1" in result.output assert "header_preamble_detected" in result.output + + +def test_status_reports_validation_summary_from_layer_artifacts(tmp_path: Path, monkeypatch) -> None: + project_dir = tmp_path / "project" + config_path = project_dir / "dataset.yml" + project_dir.mkdir() + + config_path.write_text( + """ +root: "./out" +dataset: + name: demo_ds + years: [2022] +raw: {} +clean: + sql: "sql/clean.sql" + required_columns: ["id", "value"] +mart: + tables: + - name: mart_ok + sql: "sql/mart/mart_ok.sql" + required_tables: ["mart_ok", "mart_missing"] +cross_year: + tables: + - name: cross_ok + sql: "sql/cross/cross_ok.sql" +""".strip(), + encoding="utf-8", + ) + + sql_mart_dir = project_dir / "sql" / "mart" + sql_cross_dir = project_dir / "sql" / "cross" + sql_mart_dir.mkdir(parents=True, exist_ok=True) + sql_cross_dir.mkdir(parents=True, exist_ok=True) + (project_dir / "sql" / "clean.sql").write_text("select 1 as value", encoding="utf-8") + (sql_mart_dir / "mart_ok.sql").write_text("select * from clean_input", encoding="utf-8") + (sql_cross_dir / "cross_ok.sql").write_text("select * from clean_input", encoding="utf-8") + + clean_dir = project_dir / "out" / "data" / "clean" / "demo_ds" / "2022" + mart_dir = project_dir / "out" / "data" / "mart" / "demo_ds" / "2022" + cross_dir = project_dir / "out" / "data" / "cross" / "demo_ds" + (clean_dir / "_validate").mkdir(parents=True, exist_ok=True) + (mart_dir / "_validate").mkdir(parents=True, exist_ok=True) + (cross_dir / "_validate").mkdir(parents=True, exist_ok=True) + + (clean_dir / "demo_ds_2022_clean.parquet").write_text("placeholder", encoding="utf-8") + (cross_dir / "cross_ok.parquet").write_text("placeholder", encoding="utf-8") + + (clean_dir / "manifest.json").write_text( + json.dumps( + { + "validation": "_validate/clean_validation.json", + "summary": {"ok": True, "errors_count": 0, "warnings_count": 1}, + "outputs": [{"file": "demo_ds_2022_clean.parquet"}], + }, + indent=2, + ), + encoding="utf-8", + ) + (clean_dir / "_validate" / "clean_validation.json").write_text( + json.dumps( + { + "ok": True, + "errors": [], + "warnings": ["header_preamble_detected"], + "summary": { + "required": ["id", "value"], + "columns": ["id"], + }, + }, + indent=2, + ), + encoding="utf-8", + ) + + (mart_dir / "manifest.json").write_text( + json.dumps( + { + "validation": "_validate/mart_validation.json", + "summary": {"ok": False, "errors_count": 1, "warnings_count": 1}, + "outputs": [{"file": "mart_ok.parquet"}], + }, + indent=2, + ), + encoding="utf-8", + ) + (mart_dir / "_validate" / "mart_validation.json").write_text( + json.dumps( + { + "ok": False, + "errors": ["Missing required MART tables: ['mart_missing']"], + "warnings": ["MART table_rules reference tables not declared in mart.tables: ['mart_extra']"], + "summary": { + "required_tables": ["mart_ok", "mart_missing"], + "tables": ["mart_ok"], + "per_table": {}, + }, + }, + indent=2, + ), + encoding="utf-8", + ) + + (cross_dir / "manifest.json").write_text( + json.dumps( + { + "validation": "_validate/cross_validation.json", + "summary": {"ok": True, "errors_count": 0, "warnings_count": 0}, + "outputs": [{"file": "cross_ok.parquet"}], + }, + indent=2, + ), + encoding="utf-8", + ) + (cross_dir / "_validate" / "cross_validation.json").write_text( + json.dumps( + { + "ok": True, + "errors": [], + "warnings": [], + "summary": { + "required_tables": [], + "tables": ["cross_ok"], + }, + }, + indent=2, + ), + encoding="utf-8", + ) + + run_dir = get_run_dir(project_dir / "out", "demo_ds", 2022) + _write_run_record(run_dir / "run-123.json", "run-123", "2026-03-04T10:00:00+00:00", "FAILED") + + monkeypatch.chdir(tmp_path) + runner = CliRunner() + + result = runner.invoke( + app, + ["status", "--dataset", "demo_ds", "--year", "2022", "--latest", "--config", str(config_path)], + ) + + assert result.exit_code == 0 + assert "validation_summary:" in result.output + assert "clean: state=passed warnings=1 errors=0" in result.output + assert "warnings_present: yes" in result.output + assert "missing_columns=value" in result.output + assert "mart: state=failed warnings=1 errors=1" in result.output + assert "missing_tables=mart_missing" in result.output + assert "missing_outputs=mart_ok.parquet" in result.output + assert "cross_year: state=passed warnings=0 errors=0" in result.output diff --git a/toolkit/cli/cmd_status.py b/toolkit/cli/cmd_status.py index 6c88798..c9aa3e8 100644 --- a/toolkit/cli/cmd_status.py +++ b/toolkit/cli/cmd_status.py @@ -2,11 +2,12 @@ import json from pathlib import Path +from typing import Any import typer from toolkit.core.config import load_config -from toolkit.core.paths import layer_year_dir +from toolkit.core.paths import layer_dataset_dir, layer_year_dir from toolkit.core.run_context import get_run_dir, latest_run, read_run_record @@ -48,6 +49,174 @@ def _raw_hints(root: Path, dataset: str, year: int) -> dict[str, object]: } +def _layer_artifacts_dir(root: Path, dataset: str, year: int, layer: str) -> Path: + if layer == "cross_year": + return layer_dataset_dir(root, "cross", dataset) + return layer_year_dir(root, layer, dataset, year) + + +def _validation_counts( + validation_payload: dict[str, Any] | None, + manifest_payload: dict[str, Any] | None, + record_summary: dict[str, Any] | None, +) -> tuple[bool | None, int | None, int | None]: + if validation_payload is not None: + return ( + validation_payload.get("ok"), + len(validation_payload.get("errors") or []), + len(validation_payload.get("warnings") or []), + ) + + manifest_summary = (manifest_payload or {}).get("summary") or {} + if manifest_summary: + return ( + manifest_summary.get("ok"), + manifest_summary.get("errors_count"), + manifest_summary.get("warnings_count"), + ) + + record_summary = record_summary or {} + if record_summary: + return ( + record_summary.get("passed"), + record_summary.get("errors_count"), + record_summary.get("warnings_count"), + ) + + return None, None, None + + +def _layer_validation_summary( + root: Path, + dataset: str, + year: int, + layer: str, + record: dict[str, Any], +) -> dict[str, Any] | None: + layer_dir = _layer_artifacts_dir(root, dataset, year, layer) + manifest_payload = _read_json(layer_dir / "manifest.json") + validation_rel = (manifest_payload or {}).get("validation") + validation_payload = None + validation_path = None + if isinstance(validation_rel, str) and validation_rel.strip(): + validation_path = layer_dir / validation_rel + validation_payload = _read_json(validation_path) + + record_summary = (record.get("validations") or {}).get(layer, {}) + ok, errors_count, warnings_count = _validation_counts( + validation_payload, + manifest_payload, + record_summary if isinstance(record_summary, dict) else {}, + ) + + has_any_data = any( + [ + manifest_payload is not None, + validation_payload is not None, + bool(record_summary), + layer_dir.exists(), + ] + ) + if not has_any_data: + return None + + warnings = [] + errors = [] + details: list[str] = [] + if validation_payload is not None: + warnings = [str(item) for item in (validation_payload.get("warnings") or [])] + errors = [str(item) for item in (validation_payload.get("errors") or [])] + + if validation_path is not None and validation_payload is None: + details.append(f"validation_missing={validation_path.name}") + + outputs = (manifest_payload or {}).get("outputs") or [] + if isinstance(outputs, list): + missing_outputs = [] + for entry in outputs: + if not isinstance(entry, dict): + continue + file_name = entry.get("file") + if isinstance(file_name, str) and file_name and not (layer_dir / file_name).exists(): + missing_outputs.append(file_name) + if missing_outputs: + details.append(f"missing_outputs={', '.join(missing_outputs)}") + + summary = (validation_payload or {}).get("summary") or {} + if layer == "clean": + required = summary.get("required") or [] + columns = summary.get("columns") or [] + if isinstance(required, list) and isinstance(columns, list): + missing_columns = [column for column in required if column not in set(columns)] + if missing_columns: + details.append(f"missing_columns={', '.join(str(column) for column in missing_columns)}") + if layer in {"mart", "cross_year"}: + required_tables = summary.get("required_tables") or [] + tables = summary.get("tables") or [] + if isinstance(required_tables, list) and isinstance(tables, list): + missing_tables = [table for table in required_tables if table not in set(tables)] + if missing_tables: + details.append(f"missing_tables={', '.join(str(table) for table in missing_tables)}") + + if ok is True: + state = "passed" + elif ok is False: + state = "failed" + elif manifest_payload is not None: + state = "not_validated" + else: + state = "unknown" + + return { + "layer": layer, + "state": state, + "warnings_count": warnings_count, + "errors_count": errors_count, + "has_warnings": bool(warnings_count), + "warning_items": warnings, + "error_items": errors, + "details": details, + } + + +def _print_validation_summary( + root: Path, + dataset: str, + year: int, + record: dict[str, Any], + has_cross_year: bool, +) -> None: + summaries: list[dict[str, Any]] = [] + for layer in ("clean", "mart"): + summary = _layer_validation_summary(root, dataset, year, layer, record) + if summary is not None: + summaries.append(summary) + + if has_cross_year: + summary = _layer_validation_summary(root, dataset, year, "cross_year", record) + if summary is not None: + summaries.append(summary) + + if not summaries: + return + + typer.echo("") + typer.echo("validation_summary:") + for summary in summaries: + warnings_count = summary.get("warnings_count") + errors_count = summary.get("errors_count") + typer.echo( + f" {summary['layer']}: " + f"state={summary['state']} " + f"warnings={warnings_count if warnings_count is not None else '?'} " + f"errors={errors_count if errors_count is not None else '?'}" + ) + if summary.get("has_warnings"): + typer.echo(" warnings_present: yes") + for detail in summary.get("details") or []: + typer.echo(f" {detail}") + + def status( dataset: str = typer.Option(..., "--dataset", help="Dataset name"), year: int = typer.Option(..., "--year", help="Dataset year"), @@ -66,6 +235,7 @@ def status( cfg = load_config(config, strict_config=strict_config_flag) run_dir = get_run_dir(cfg.root, dataset, year) record = read_run_record(run_dir, run_id) if run_id else latest_run(run_dir) + has_cross_year = bool((cfg.cross_year or {}).get("tables")) typer.echo(f"dataset: {record.get('dataset')}") typer.echo(f"year: {record.get('year')}") @@ -93,6 +263,7 @@ def status( typer.echo("layer layer_status validation_passed errors_count warnings_count") for layer in ("raw", "clean", "mart"): typer.echo(_layer_row(record, layer)) + _print_validation_summary(Path(cfg.root), dataset, year, record, has_cross_year) if record.get("status") == "FAILED" and record.get("error"): typer.echo("")