diff --git a/README.md b/README.md index 5e637ed..fb3e4b5 100644 --- a/README.md +++ b/README.md @@ -218,6 +218,7 @@ Per il percorso base: - `validate all` esegue i quality checks su CLEAN e MART - `status` legge il run record e mostra lo stato piu` recente - `inspect paths` espone i path stabili per notebook e script locali, insieme ai principali hints del RAW +- `inspect schema-diff` confronta i principali segnali di schema RAW tra gli anni configurati - `--dry-run` valida config e SQL senza eseguire la pipeline Esempi: @@ -227,6 +228,7 @@ toolkit run all --config dataset.yml --strict-config toolkit validate all --config dataset.yml --strict-config toolkit status --dataset my_dataset --year 2024 --latest --config dataset.yml toolkit inspect paths --config dataset.yml --year 2024 --json +toolkit inspect schema-diff --config dataset.yml --json toolkit run all --config dataset.yml --dry-run --strict-config ``` diff --git a/docs/advanced-workflows.md b/docs/advanced-workflows.md index 2803ee9..fdf1bb3 100644 --- a/docs/advanced-workflows.md +++ b/docs/advanced-workflows.md @@ -72,6 +72,7 @@ Uso consigliato: - repo dataset nuovi: configurazione esplicita e `--strict-config` - `profile raw` solo se serve capire meglio il formato RAW +- `inspect schema-diff` quando vuoi confrontare rapidamente hints e colonne tra piu anni senza aprire a mano i metadata RAW ## Artifact policy diff --git a/tests/test_cli_inspect_schema_diff.py b/tests/test_cli_inspect_schema_diff.py new file mode 100644 index 0000000..2632f27 --- /dev/null +++ b/tests/test_cli_inspect_schema_diff.py @@ -0,0 +1,120 @@ +from __future__ import annotations + +import json +from pathlib import Path + +from typer.testing import CliRunner + +from toolkit.cli.app import app + + +def _write_dataset_config(base_dir: Path) -> Path: + config_path = base_dir / "dataset.yml" + config_path.write_text( + "\n".join( + [ + 'root: "./_out"', + "", + "dataset:", + ' name: "schema_diff_example"', + " years: [2022, 2023]", + "", + "raw:", + " sources:", + ' - name: "dummy"', + ' type: "local_file"', + " args:", + ' path: "input.csv"', + ' filename: "input_{year}.csv"', + "", + "clean:", + ' sql: "sql/clean.sql"', + " validate: {}", + "", + "mart:", + " tables: []", + " validate: {}", + "", + ] + ) + + "\n", + encoding="utf-8", + ) + return config_path + + +def _write_raw_year( + root: Path, + year: int, + file_name: str, + header_line: str, + *, + delim: str = ",", +) -> None: + raw_dir = root / "data" / "raw" / "schema_diff_example" / str(year) + raw_dir.mkdir(parents=True, exist_ok=True) + (raw_dir / file_name).write_text(header_line + "\n1,2,3\n", encoding="utf-8") + (raw_dir / "manifest.json").write_text( + json.dumps({"primary_output_file": file_name}), + encoding="utf-8", + ) + (raw_dir / "metadata.json").write_text( + json.dumps( + { + "profile_hints": { + "file_used": file_name, + "encoding_suggested": "utf-8", + "delim_suggested": delim, + "decimal_suggested": ".", + "skip_suggested": 0, + "header_line": header_line, + "columns_preview": header_line.split(delim), + "warnings": [], + } + } + ), + encoding="utf-8", + ) + + +def test_inspect_schema_diff_reports_multi_year_changes(tmp_path: Path, monkeypatch) -> None: + config_path = _write_dataset_config(tmp_path) + root = tmp_path / "_out" + _write_raw_year(root, 2022, "input_2022.csv", "anno,comune,imponibile") + _write_raw_year(root, 2023, "input_2023.csv", "anno,comune,imponibile,contribuenti") + + runner = CliRunner() + monkeypatch.chdir(tmp_path) + + result = runner.invoke(app, ["inspect", "schema-diff", "--config", str(config_path), "--strict-config"]) + + assert result.exit_code == 0, result.output + assert "dataset: schema_diff_example" in result.output + assert "year: 2022" in result.output + assert "year: 2023" in result.output + assert "2022 -> 2023:" in result.output + assert "counts: 3 -> 4" in result.output + assert "added_columns:" in result.output + assert "contribuenti" in result.output + + +def test_inspect_schema_diff_json_degrades_when_raw_is_missing(tmp_path: Path, monkeypatch) -> None: + config_path = _write_dataset_config(tmp_path) + + runner = CliRunner() + monkeypatch.chdir(tmp_path) + + result = runner.invoke( + app, + ["inspect", "schema-diff", "--config", str(config_path), "--json", "--strict-config"], + ) + + assert result.exit_code == 0, result.output + payload = json.loads(result.output) + assert payload["dataset"] == "schema_diff_example" + assert payload["years"] == [2022, 2023] + assert len(payload["entries"]) == 2 + assert payload["entries"][0]["raw_exists"] is False + assert payload["entries"][0]["primary_output_file"] is None + assert payload["entries"][0]["columns_count"] == 0 + assert payload["comparisons"][0]["changed"] is False diff --git a/toolkit/cli/cmd_inspect.py b/toolkit/cli/cmd_inspect.py index e6f7cf8..9dbd78a 100644 --- a/toolkit/cli/cmd_inspect.py +++ b/toolkit/cli/cmd_inspect.py @@ -9,6 +9,7 @@ from toolkit.cli.common import iter_years from toolkit.core.config import load_config from toolkit.core.paths import layer_year_dir +from toolkit.profile.raw import build_profile_hints from toolkit.core.run_context import get_run_dir, latest_run @@ -19,6 +20,75 @@ def _read_json(path: Path) -> dict[str, Any] | None: return None +def _raw_primary_file(raw_dir: Path, manifest: dict[str, Any]) -> Path | None: + primary_output_file = manifest.get("primary_output_file") + if isinstance(primary_output_file, str): + candidate = raw_dir / primary_output_file + if candidate.exists(): + return candidate + return None + + +def _raw_schema_payload(cfg, year: int) -> dict[str, Any]: + root = Path(cfg.root) + raw_dir = layer_year_dir(root, "raw", cfg.dataset, year) + manifest = _read_json(raw_dir / "manifest.json") or {} + metadata = _read_json(raw_dir / "metadata.json") or {} + primary_file = _raw_primary_file(raw_dir, manifest) + + profile_hints = metadata.get("profile_hints") or {} + profile_source = "metadata" if profile_hints else None + sniff_error: str | None = None + + if not profile_hints and primary_file is not None: + try: + profile_hints = build_profile_hints(primary_file) + profile_source = "sniff" + except Exception as exc: + sniff_error = f"{type(exc).__name__}: {exc}" + + columns_preview = profile_hints.get("columns_preview") or [] + warnings = list(profile_hints.get("warnings") or []) + if sniff_error is not None: + warnings.append(f"profile_hint_fallback_failed: {sniff_error}") + + return { + "year": year, + "raw_dir": str(raw_dir), + "raw_exists": raw_dir.exists(), + "primary_output_file": manifest.get("primary_output_file"), + "file_used": profile_hints.get("file_used"), + "profile_source": profile_source, + "encoding": profile_hints.get("encoding_suggested"), + "delim": profile_hints.get("delim_suggested"), + "decimal": profile_hints.get("decimal_suggested"), + "skip": profile_hints.get("skip_suggested"), + "header_line": profile_hints.get("header_line"), + "columns_count": len(columns_preview), + "columns_preview": columns_preview, + "warnings": warnings, + } + + +def _compare_schema_entries(entries: list[dict[str, Any]]) -> list[dict[str, Any]]: + comparisons: list[dict[str, Any]] = [] + for previous, current in zip(entries, entries[1:]): + previous_columns = set(previous.get("columns_preview") or []) + current_columns = set(current.get("columns_preview") or []) + comparisons.append( + { + "from_year": previous["year"], + "to_year": current["year"], + "from_columns_count": previous.get("columns_count") or 0, + "to_columns_count": current.get("columns_count") or 0, + "added_columns": sorted(current_columns - previous_columns), + "removed_columns": sorted(previous_columns - current_columns), + "changed": previous_columns != current_columns, + } + ) + return comparisons + + def _raw_output_paths(root: Path, dataset: str, year: int) -> dict[str, str]: raw_dir = layer_year_dir(root, "raw", dataset, year) return { @@ -168,7 +238,79 @@ def paths( typer.echo("") +def schema_diff( + config: str = typer.Option(..., "--config", "-c", help="Path to dataset.yml"), + as_json: bool = typer.Option(False, "--json", help="Emit JSON output"), + strict_config: bool = typer.Option(False, "--strict-config", help="Treat deprecated config forms as errors"), +): + """ + Confronta i principali segnali di schema RAW tra gli anni configurati. + """ + strict_config_flag = strict_config if isinstance(strict_config, bool) else False + cfg = load_config(config, strict_config=strict_config_flag) + entries = [_raw_schema_payload(cfg, selected_year) for selected_year in iter_years(cfg, None)] + comparisons = _compare_schema_entries(entries) + payload = { + "dataset": cfg.dataset, + "config_path": str(cfg.base_dir / "dataset.yml"), + "years": [entry["year"] for entry in entries], + "entries": entries, + "comparisons": comparisons, + } + + if as_json: + typer.echo(json.dumps(payload, indent=2, ensure_ascii=False)) + return + + typer.echo(f"dataset: {payload['dataset']}") + typer.echo(f"config_path: {payload['config_path']}") + typer.echo(f"years: {', '.join(str(year) for year in payload['years'])}") + typer.echo("") + + for entry in entries: + typer.echo(f"year: {entry['year']}") + typer.echo(f" raw_exists: {entry['raw_exists']}") + typer.echo(f" raw_dir: {entry['raw_dir']}") + typer.echo(f" primary_output_file: {entry['primary_output_file']}") + typer.echo(f" profile_source: {entry['profile_source']}") + typer.echo(f" encoding: {entry['encoding']}") + typer.echo(f" delim: {entry['delim']}") + typer.echo(f" decimal: {entry['decimal']}") + typer.echo(f" skip: {entry['skip']}") + typer.echo(f" columns_count: {entry['columns_count']}") + typer.echo(f" header_line: {entry['header_line']}") + if entry["columns_preview"]: + typer.echo(" columns_preview:") + for column in entry["columns_preview"]: + typer.echo(f" - {column}") + if entry["warnings"]: + typer.echo(" warnings:") + for warning in entry["warnings"]: + typer.echo(f" - {warning}") + typer.echo("") + + if comparisons: + typer.echo("comparisons:") + for comparison in comparisons: + typer.echo(f" {comparison['from_year']} -> {comparison['to_year']}:") + typer.echo( + f" counts: {comparison['from_columns_count']} -> {comparison['to_columns_count']}" + ) + typer.echo(f" changed: {comparison['changed']}") + if comparison["added_columns"]: + typer.echo(" added_columns:") + for column in comparison["added_columns"]: + typer.echo(f" - {column}") + if comparison["removed_columns"]: + typer.echo(" removed_columns:") + for column in comparison["removed_columns"]: + typer.echo(f" - {column}") + else: + typer.echo("comparisons: none") + + def register(app: typer.Typer) -> None: inspect_app = typer.Typer(no_args_is_help=True, add_completion=False) inspect_app.command("paths")(paths) + inspect_app.command("schema-diff")(schema_diff) app.add_typer(inspect_app, name="inspect")