Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ Per il percorso base:
- `validate all` esegue i quality checks su CLEAN e MART
- `status` legge il run record e mostra lo stato piu` recente
- `inspect paths` espone i path stabili per notebook e script locali, insieme ai principali hints del RAW
- `inspect schema-diff` confronta i principali segnali di schema RAW tra gli anni configurati
- `--dry-run` valida config e SQL senza eseguire la pipeline

Esempi:
Expand All @@ -227,6 +228,7 @@ toolkit run all --config dataset.yml --strict-config
toolkit validate all --config dataset.yml --strict-config
toolkit status --dataset my_dataset --year 2024 --latest --config dataset.yml
toolkit inspect paths --config dataset.yml --year 2024 --json
toolkit inspect schema-diff --config dataset.yml --json
toolkit run all --config dataset.yml --dry-run --strict-config
```

Expand Down
1 change: 1 addition & 0 deletions docs/advanced-workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Uso consigliato:

- repo dataset nuovi: configurazione esplicita e `--strict-config`
- `profile raw` solo se serve capire meglio il formato RAW
- `inspect schema-diff` quando vuoi confrontare rapidamente hints e colonne tra piu anni senza aprire a mano i metadata RAW

## Artifact policy

Expand Down
120 changes: 120 additions & 0 deletions tests/test_cli_inspect_schema_diff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from __future__ import annotations

import json
from pathlib import Path

from typer.testing import CliRunner

from toolkit.cli.app import app


def _write_dataset_config(base_dir: Path) -> Path:
config_path = base_dir / "dataset.yml"
config_path.write_text(
"\n".join(
[
'root: "./_out"',
"",
"dataset:",
' name: "schema_diff_example"',
" years: [2022, 2023]",
"",
"raw:",
" sources:",
' - name: "dummy"',
' type: "local_file"',
" args:",
' path: "input.csv"',
' filename: "input_{year}.csv"',
"",
"clean:",
' sql: "sql/clean.sql"',
" validate: {}",
"",
"mart:",
" tables: []",
" validate: {}",
"",
]
)
+ "\n",
encoding="utf-8",
)
return config_path


def _write_raw_year(
root: Path,
year: int,
file_name: str,
header_line: str,
*,
delim: str = ",",
) -> None:
raw_dir = root / "data" / "raw" / "schema_diff_example" / str(year)
raw_dir.mkdir(parents=True, exist_ok=True)
(raw_dir / file_name).write_text(header_line + "\n1,2,3\n", encoding="utf-8")
(raw_dir / "manifest.json").write_text(
json.dumps({"primary_output_file": file_name}),
encoding="utf-8",
)
(raw_dir / "metadata.json").write_text(
json.dumps(
{
"profile_hints": {
"file_used": file_name,
"encoding_suggested": "utf-8",
"delim_suggested": delim,
"decimal_suggested": ".",
"skip_suggested": 0,
"header_line": header_line,
"columns_preview": header_line.split(delim),
"warnings": [],
}
}
),
encoding="utf-8",
)


def test_inspect_schema_diff_reports_multi_year_changes(tmp_path: Path, monkeypatch) -> None:
config_path = _write_dataset_config(tmp_path)
root = tmp_path / "_out"
_write_raw_year(root, 2022, "input_2022.csv", "anno,comune,imponibile")
_write_raw_year(root, 2023, "input_2023.csv", "anno,comune,imponibile,contribuenti")

runner = CliRunner()
monkeypatch.chdir(tmp_path)

result = runner.invoke(app, ["inspect", "schema-diff", "--config", str(config_path), "--strict-config"])

assert result.exit_code == 0, result.output
assert "dataset: schema_diff_example" in result.output
assert "year: 2022" in result.output
assert "year: 2023" in result.output
assert "2022 -> 2023:" in result.output
assert "counts: 3 -> 4" in result.output
assert "added_columns:" in result.output
assert "contribuenti" in result.output


def test_inspect_schema_diff_json_degrades_when_raw_is_missing(tmp_path: Path, monkeypatch) -> None:
config_path = _write_dataset_config(tmp_path)

runner = CliRunner()
monkeypatch.chdir(tmp_path)

result = runner.invoke(
app,
["inspect", "schema-diff", "--config", str(config_path), "--json", "--strict-config"],
)

assert result.exit_code == 0, result.output
payload = json.loads(result.output)
assert payload["dataset"] == "schema_diff_example"
assert payload["years"] == [2022, 2023]
assert len(payload["entries"]) == 2
assert payload["entries"][0]["raw_exists"] is False
assert payload["entries"][0]["primary_output_file"] is None
assert payload["entries"][0]["columns_count"] == 0
assert payload["comparisons"][0]["changed"] is False
142 changes: 142 additions & 0 deletions toolkit/cli/cmd_inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from toolkit.cli.common import iter_years
from toolkit.core.config import load_config
from toolkit.core.paths import layer_year_dir
from toolkit.profile.raw import build_profile_hints
from toolkit.core.run_context import get_run_dir, latest_run


Expand All @@ -19,6 +20,75 @@ def _read_json(path: Path) -> dict[str, Any] | None:
return None


def _raw_primary_file(raw_dir: Path, manifest: dict[str, Any]) -> Path | None:
primary_output_file = manifest.get("primary_output_file")
if isinstance(primary_output_file, str):
candidate = raw_dir / primary_output_file
if candidate.exists():
return candidate
return None


def _raw_schema_payload(cfg, year: int) -> dict[str, Any]:
root = Path(cfg.root)
raw_dir = layer_year_dir(root, "raw", cfg.dataset, year)
manifest = _read_json(raw_dir / "manifest.json") or {}
metadata = _read_json(raw_dir / "metadata.json") or {}
primary_file = _raw_primary_file(raw_dir, manifest)

profile_hints = metadata.get("profile_hints") or {}
profile_source = "metadata" if profile_hints else None
sniff_error: str | None = None

if not profile_hints and primary_file is not None:
try:
profile_hints = build_profile_hints(primary_file)
profile_source = "sniff"
except Exception as exc:
sniff_error = f"{type(exc).__name__}: {exc}"

columns_preview = profile_hints.get("columns_preview") or []
warnings = list(profile_hints.get("warnings") or [])
if sniff_error is not None:
warnings.append(f"profile_hint_fallback_failed: {sniff_error}")

return {
"year": year,
"raw_dir": str(raw_dir),
"raw_exists": raw_dir.exists(),
"primary_output_file": manifest.get("primary_output_file"),
"file_used": profile_hints.get("file_used"),
"profile_source": profile_source,
"encoding": profile_hints.get("encoding_suggested"),
"delim": profile_hints.get("delim_suggested"),
"decimal": profile_hints.get("decimal_suggested"),
"skip": profile_hints.get("skip_suggested"),
"header_line": profile_hints.get("header_line"),
"columns_count": len(columns_preview),
"columns_preview": columns_preview,
"warnings": warnings,
}


def _compare_schema_entries(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
comparisons: list[dict[str, Any]] = []
for previous, current in zip(entries, entries[1:]):
previous_columns = set(previous.get("columns_preview") or [])
current_columns = set(current.get("columns_preview") or [])
comparisons.append(
{
"from_year": previous["year"],
"to_year": current["year"],
"from_columns_count": previous.get("columns_count") or 0,
"to_columns_count": current.get("columns_count") or 0,
"added_columns": sorted(current_columns - previous_columns),
"removed_columns": sorted(previous_columns - current_columns),
"changed": previous_columns != current_columns,
}
)
return comparisons


def _raw_output_paths(root: Path, dataset: str, year: int) -> dict[str, str]:
raw_dir = layer_year_dir(root, "raw", dataset, year)
return {
Expand Down Expand Up @@ -168,7 +238,79 @@ def paths(
typer.echo("")


def schema_diff(
config: str = typer.Option(..., "--config", "-c", help="Path to dataset.yml"),
as_json: bool = typer.Option(False, "--json", help="Emit JSON output"),
strict_config: bool = typer.Option(False, "--strict-config", help="Treat deprecated config forms as errors"),
):
"""
Confronta i principali segnali di schema RAW tra gli anni configurati.
"""
strict_config_flag = strict_config if isinstance(strict_config, bool) else False
cfg = load_config(config, strict_config=strict_config_flag)
entries = [_raw_schema_payload(cfg, selected_year) for selected_year in iter_years(cfg, None)]
comparisons = _compare_schema_entries(entries)
payload = {
"dataset": cfg.dataset,
"config_path": str(cfg.base_dir / "dataset.yml"),
"years": [entry["year"] for entry in entries],
"entries": entries,
"comparisons": comparisons,
}

if as_json:
typer.echo(json.dumps(payload, indent=2, ensure_ascii=False))
return

typer.echo(f"dataset: {payload['dataset']}")
typer.echo(f"config_path: {payload['config_path']}")
typer.echo(f"years: {', '.join(str(year) for year in payload['years'])}")
typer.echo("")

for entry in entries:
typer.echo(f"year: {entry['year']}")
typer.echo(f" raw_exists: {entry['raw_exists']}")
typer.echo(f" raw_dir: {entry['raw_dir']}")
typer.echo(f" primary_output_file: {entry['primary_output_file']}")
typer.echo(f" profile_source: {entry['profile_source']}")
typer.echo(f" encoding: {entry['encoding']}")
typer.echo(f" delim: {entry['delim']}")
typer.echo(f" decimal: {entry['decimal']}")
typer.echo(f" skip: {entry['skip']}")
typer.echo(f" columns_count: {entry['columns_count']}")
typer.echo(f" header_line: {entry['header_line']}")
if entry["columns_preview"]:
typer.echo(" columns_preview:")
for column in entry["columns_preview"]:
typer.echo(f" - {column}")
if entry["warnings"]:
typer.echo(" warnings:")
for warning in entry["warnings"]:
typer.echo(f" - {warning}")
typer.echo("")

if comparisons:
typer.echo("comparisons:")
for comparison in comparisons:
typer.echo(f" {comparison['from_year']} -> {comparison['to_year']}:")
typer.echo(
f" counts: {comparison['from_columns_count']} -> {comparison['to_columns_count']}"
)
typer.echo(f" changed: {comparison['changed']}")
if comparison["added_columns"]:
typer.echo(" added_columns:")
for column in comparison["added_columns"]:
typer.echo(f" - {column}")
if comparison["removed_columns"]:
typer.echo(" removed_columns:")
for column in comparison["removed_columns"]:
typer.echo(f" - {column}")
else:
typer.echo("comparisons: none")


def register(app: typer.Typer) -> None:
inspect_app = typer.Typer(no_args_is_help=True, add_completion=False)
inspect_app.command("paths")(paths)
inspect_app.command("schema-diff")(schema_diff)
app.add_typer(inspect_app, name="inspect")