Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ Per il percorso base:
- `run all` esegue RAW -> CLEAN -> MART
- `validate all` esegue i quality checks su CLEAN e MART
- `status` legge il run record e mostra lo stato piu` recente
- `inspect paths` espone i path stabili per notebook e script locali
- `inspect paths` espone i path stabili per notebook e script locali, insieme ai principali hints del RAW
- `--dry-run` valida config e SQL senza eseguire la pipeline

Esempi:
Expand All @@ -242,6 +242,7 @@ In pratica:
- CLEAN: `root/data/clean/<dataset>/<year>/`
- MART: `root/data/mart/<dataset>/<year>/`
- run records: `root/data/_runs/<dataset>/<year>/`
- hints RAW: `root/data/raw/<dataset>/<year>/_profile/suggested_read.yml`

Helper ufficiale per evitare path logic duplicata nei notebook:

Expand All @@ -254,7 +255,7 @@ Ruoli stabili degli output:
- `metadata.json`: payload ricco del layer
- `manifest.json`: summary stabile del layer con puntatori a metadata e validation
- `data/_runs/.../<run_id>.json`: stato del run usato da `status` e `resume`
- `inspect paths --json`: discovery helper read-only per notebook e script locali
- `inspect paths --json`: discovery helper read-only per notebook e script locali, con blocco `raw_hints`

Questo mantiene il contratto semplice tra toolkit e repo dataset:

Expand Down
5 changes: 5 additions & 0 deletions docs/advanced-workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ Artefatti principali:
- `raw/<dataset>/<year>/_profile/raw_profile.json`
- `raw/<dataset>/<year>/_profile/suggested_read.yml`

Nota pratica:

- `run raw` scrive gia` un `suggested_read.yml` leggero e conservativo quando il file primario e` profilabile
- `profile raw` resta il comando da usare quando vuoi profiling piu` ricco, report diagnostici e `suggested_mapping.yml`

`profile.json` resta un alias legacy opzionale e non e` il nome canonico da promuovere nei nuovi repo.

## CLEAN read e input selection
Expand Down
2 changes: 2 additions & 0 deletions docs/conventions.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Questa pagina raccoglie i contratti operativi stabili del toolkit per la pipelin
- Il file JSON canonico e` `raw_profile.json`, ma viene scritto solo per policy `standard|debug`.
- `profile.json` resta un alias di compatibilita` opzionale, controllato da `output.legacy_aliases`.
- `suggested_read.yml` e` il contratto usato da CLEAN per i format hints e resta richiesto solo quando `clean.read.source: auto`.
- `run raw` puo` scrivere un `suggested_read.yml` conservativo gia` nel percorso canonico.
- `profile raw` puo` rigenerare lo stesso file insieme ad artefatti diagnostici piu` ricchi.
- `suggested_mapping.yml` resta un artefatto diagnostico opzionale per uso umano; non e` un input del runtime canonico del toolkit.

## Artifacts Policy
Expand Down
22 changes: 20 additions & 2 deletions tests/test_artifacts_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,16 @@ def test_artifacts_policy_minimal_skips_optional_outputs(tmp_path: Path, monkeyp
year = cfg.years[0]
logger = _NoopLogger()

run_raw(cfg.dataset, year, cfg.root, cfg.raw, logger, base_dir=cfg.base_dir)
run_raw(
cfg.dataset,
year,
cfg.root,
cfg.raw,
logger,
base_dir=cfg.base_dir,
output_cfg=cfg.output,
clean_cfg=cfg.clean,
)
profile_cmd(step="raw", config=str(config_path))
run_clean(cfg.dataset, year, cfg.root, cfg.clean, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
Expand Down Expand Up @@ -96,7 +105,16 @@ def test_artifacts_policy_standard_keeps_current_debug_artifacts(tmp_path: Path,
year = cfg.years[0]
logger = _NoopLogger()

run_raw(cfg.dataset, year, cfg.root, cfg.raw, logger, base_dir=cfg.base_dir)
run_raw(
cfg.dataset,
year,
cfg.root,
cfg.raw,
logger,
base_dir=cfg.base_dir,
output_cfg=cfg.output,
clean_cfg=cfg.clean,
)
profile_cmd(step="raw", config=str(config_path))
run_clean(cfg.dataset, year, cfg.root, cfg.clean, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
Expand Down
6 changes: 6 additions & 0 deletions tests/test_cli_inspect_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def test_inspect_paths_reports_dataset_repo_layout_from_other_cwd(tmp_path: Path
assert f"clean_output: {dst / '_smoke_out' / 'data' / 'clean' / 'project_example' / '2022' / 'project_example_2022_clean.parquet'}" in result.output
assert f"clean_validation: {dst / '_smoke_out' / 'data' / 'clean' / 'project_example' / '2022' / '_validate' / 'clean_validation.json'}" in result.output
assert f"mart_manifest: {dst / '_smoke_out' / 'data' / 'mart' / 'project_example' / '2022' / 'manifest.json'}" in result.output
assert "raw_hints:" in result.output
assert "primary_output_file:" in result.output
assert "suggested_read_exists: True" in result.output
assert "latest_run_status: SUCCESS" in result.output


Expand Down Expand Up @@ -63,4 +66,7 @@ def test_inspect_paths_json_is_notebook_friendly(tmp_path: Path, monkeypatch) ->
assert payload["paths"]["raw"]["manifest"].endswith("manifest.json")
assert payload["paths"]["mart"]["outputs"]
assert payload["paths"]["mart"]["metadata"].endswith("metadata.json")
assert payload["raw_hints"]["primary_output_file"] is None
assert payload["raw_hints"]["suggested_read_exists"] is False
assert payload["raw_hints"]["suggested_read_path"].endswith("suggested_read.yml")
assert payload["latest_run"] is None
71 changes: 71 additions & 0 deletions tests/test_cli_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,74 @@ def test_status_uses_same_run_dir_as_writer(tmp_path: Path, monkeypatch) -> None
assert result.exit_code == 0
assert f"run_id: {written_run_id}" in result.output
assert "status: DRY_RUN" in result.output


def test_status_reports_raw_hints_when_raw_artifacts_exist(tmp_path: Path, monkeypatch) -> None:
project_dir = tmp_path / "project"
raw_dir = project_dir / "out" / "data" / "raw" / "demo_ds" / "2022"
raw_dir.mkdir(parents=True)
(raw_dir / "_profile").mkdir(parents=True)
config_path = project_dir / "dataset.yml"

config_path.write_text(
"""
root: "./out"
dataset:
name: demo_ds
years: [2022]
raw: {}
clean:
sql: "sql/clean.sql"
mart:
tables:
- name: mart_example
sql: "sql/mart/mart_example.sql"
""".strip(),
encoding="utf-8",
)

sql_dir = project_dir / "sql" / "mart"
sql_dir.mkdir(parents=True, exist_ok=True)
(project_dir / "sql" / "clean.sql").write_text("select 1 as value", encoding="utf-8")
(sql_dir / "mart_example.sql").write_text("select * from clean_input", encoding="utf-8")

(raw_dir / "manifest.json").write_text(
json.dumps({"primary_output_file": "demo.csv"}, indent=2),
encoding="utf-8",
)
(raw_dir / "metadata.json").write_text(
json.dumps(
{
"profile_hints": {
"encoding_suggested": "utf-8",
"delim_suggested": ";",
"decimal_suggested": None,
"skip_suggested": 1,
"warnings": ["header_preamble_detected"],
}
},
indent=2,
),
encoding="utf-8",
)
(raw_dir / "_profile" / "suggested_read.yml").write_text("clean:\n read:\n delim: \";\"\n", encoding="utf-8")

run_dir = get_run_dir(project_dir / "out", "demo_ds", 2022)
_write_run_record(run_dir / "run-123.json", "run-123", "2026-03-04T10:00:00+00:00", "SUCCESS")

monkeypatch.chdir(tmp_path)
runner = CliRunner()

result = runner.invoke(
app,
["status", "--dataset", "demo_ds", "--year", "2022", "--latest", "--config", str(config_path)],
)

assert result.exit_code == 0
assert "raw_hints:" in result.output
assert "primary_output_file: demo.csv" in result.output
assert "suggested_read_exists: True" in result.output
assert "encoding: utf-8" in result.output
assert "delim: ;" in result.output
assert "skip: 1" in result.output
assert "header_preamble_detected" in result.output
23 changes: 21 additions & 2 deletions tests/test_project_example_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,16 @@ def test_project_example_golden_path(tmp_path: Path, monkeypatch):
year = cfg.years[0]
logger = _NoopLogger()

run_raw(cfg.dataset, year, cfg.root, cfg.raw, logger, base_dir=cfg.base_dir)
run_raw(
cfg.dataset,
year,
cfg.root,
cfg.raw,
logger,
base_dir=cfg.base_dir,
output_cfg=cfg.output,
clean_cfg=cfg.clean,
)
run_clean(cfg.dataset, year, cfg.root, cfg.clean, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
validate_cmd(step="clean", config=str(dst / "dataset.yml"))
Expand All @@ -64,6 +73,7 @@ def test_project_example_golden_path(tmp_path: Path, monkeypatch):
assert (raw_dir / "raw_validation.json").exists()
assert (raw_dir / "metadata.json").exists()
assert (raw_dir / "manifest.json").exists()
assert (raw_dir / "_profile" / "suggested_read.yml").exists()
assert clean_parquet.exists()
assert (clean_dir / "metadata.json").exists()
assert (clean_dir / "manifest.json").exists()
Expand Down Expand Up @@ -179,7 +189,16 @@ def test_project_example_outputs_can_be_replaced_after_run(tmp_path: Path, monke
year = cfg.years[0]
logger = _NoopLogger()

run_raw(cfg.dataset, year, cfg.root, cfg.raw, logger, base_dir=cfg.base_dir)
run_raw(
cfg.dataset,
year,
cfg.root,
cfg.raw,
logger,
base_dir=cfg.base_dir,
output_cfg=cfg.output,
clean_cfg=cfg.clean,
)
run_clean(cfg.dataset, year, cfg.root, cfg.clean, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)

Expand Down
34 changes: 34 additions & 0 deletions toolkit/cli/cmd_inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
from toolkit.core.run_context import get_run_dir, latest_run


def _read_json(path: Path) -> dict[str, Any] | None:
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None


def _raw_output_paths(root: Path, dataset: str, year: int) -> dict[str, str]:
raw_dir = layer_year_dir(root, "raw", dataset, year)
return {
Expand Down Expand Up @@ -56,6 +63,11 @@ def _payload_for_year(cfg, year: int) -> dict[str, Any]:
root = Path(cfg.root)
run_dir = get_run_dir(root, cfg.dataset, year)
mart_tables = cfg.mart.get("tables") or []
raw_dir = layer_year_dir(root, "raw", cfg.dataset, year)
raw_manifest = _read_json(raw_dir / "manifest.json") or {}
raw_metadata = _read_json(raw_dir / "metadata.json") or {}
suggested_read_path = raw_dir / "_profile" / "suggested_read.yml"
profile_hints = raw_metadata.get("profile_hints") or {}

latest_payload: dict[str, Any] | None = None
try:
Expand All @@ -80,6 +92,16 @@ def _payload_for_year(cfg, year: int) -> dict[str, Any]:
"mart": _mart_paths(root, cfg.dataset, year, mart_tables),
"run_dir": str(run_dir),
},
"raw_hints": {
"primary_output_file": raw_manifest.get("primary_output_file"),
"suggested_read_path": str(suggested_read_path),
"suggested_read_exists": suggested_read_path.exists(),
"encoding": profile_hints.get("encoding_suggested"),
"delim": profile_hints.get("delim_suggested"),
"decimal": profile_hints.get("decimal_suggested"),
"skip": profile_hints.get("skip_suggested"),
"warnings": profile_hints.get("warnings") or [],
},
"latest_run": latest_payload,
}

Expand Down Expand Up @@ -111,6 +133,18 @@ def paths(
typer.echo(f"raw_manifest: {item['paths']['raw']['manifest']}")
typer.echo(f"raw_metadata: {item['paths']['raw']['metadata']}")
typer.echo(f"raw_validation: {item['paths']['raw']['validation']}")
typer.echo("raw_hints:")
typer.echo(f" - primary_output_file: {item['raw_hints']['primary_output_file']}")
typer.echo(f" - suggested_read_exists: {item['raw_hints']['suggested_read_exists']}")
typer.echo(f" - suggested_read_path: {item['raw_hints']['suggested_read_path']}")
typer.echo(f" - encoding: {item['raw_hints']['encoding']}")
typer.echo(f" - delim: {item['raw_hints']['delim']}")
typer.echo(f" - decimal: {item['raw_hints']['decimal']}")
typer.echo(f" - skip: {item['raw_hints']['skip']}")
if item["raw_hints"]["warnings"]:
typer.echo(" - warnings:")
for warning in item["raw_hints"]["warnings"]:
typer.echo(f" - {warning}")
typer.echo(f"clean_dir: {item['paths']['clean']['dir']}")
typer.echo(f"clean_output: {item['paths']['clean']['output']}")
typer.echo(f"clean_manifest: {item['paths']['clean']['manifest']}")
Expand Down
20 changes: 6 additions & 14 deletions toolkit/cli/cmd_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
from toolkit.cli.common import iter_years, load_cfg_and_logger
from toolkit.core.artifacts import resolve_artifact_policy, should_write
from toolkit.core.paths import layer_year_dir
from toolkit.profile.raw import build_suggested_read_cfg, profile_raw, write_raw_profile
from toolkit.profile.raw import (
build_suggested_read_cfg,
profile_raw,
write_raw_profile,
write_suggested_read_yml,
)


def render_profile_md(profile: dict[str, Any]) -> str:
Expand Down Expand Up @@ -93,19 +98,6 @@ def _yml_scalar(v: Any) -> str:
return str(v)


def write_suggested_read_yml(out_dir: Path, profile: dict[str, Any]) -> Path:
out_dir.mkdir(parents=True, exist_ok=True)
suggested_read = build_suggested_read_cfg(profile)

lines = ["clean:", " read:"]
for key, value in suggested_read.items():
lines.append(f" {key}: {_yml_scalar(value)}")

p = out_dir / "suggested_read.yml"
p.write_text("\n".join(lines) + "\n", encoding="utf-8")
return p


def write_suggested_mapping_yml(out_dir: Path, profile: dict[str, Any]) -> Path:
out_dir.mkdir(parents=True, exist_ok=True)
mapping = profile.get("mapping_suggestions") or {}
Expand Down
2 changes: 2 additions & 0 deletions toolkit/cli/cmd_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ def _execute_layer(layer_name: str, target, *args, **kwargs) -> None:
base_dir=cfg.base_dir,
run_id=context.run_id,
strict_plugins=bool((getattr(cfg, "config", {}) or {}).get("strict", False)),
output_cfg=cfg.output,
clean_cfg=cfg.clean,
)

if "clean" in layers_to_run:
Expand Down
43 changes: 43 additions & 0 deletions toolkit/cli/cmd_status.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from __future__ import annotations

import json
from pathlib import Path

import typer

from toolkit.core.config import load_config
from toolkit.core.paths import layer_year_dir
from toolkit.core.run_context import get_run_dir, latest_run, read_run_record


Expand All @@ -19,6 +23,31 @@ def _layer_row(record: dict[str, object], layer: str) -> str:
)


def _read_json(path: Path) -> dict[str, object] | None:
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None


def _raw_hints(root: Path, dataset: str, year: int) -> dict[str, object]:
raw_dir = layer_year_dir(root, "raw", dataset, year)
raw_manifest = _read_json(raw_dir / "manifest.json") or {}
raw_metadata = _read_json(raw_dir / "metadata.json") or {}
profile_hints = raw_metadata.get("profile_hints") or {}
suggested_read_path = raw_dir / "_profile" / "suggested_read.yml"
return {
"primary_output_file": raw_manifest.get("primary_output_file"),
"suggested_read_exists": suggested_read_path.exists(),
"suggested_read_path": str(suggested_read_path),
"encoding": profile_hints.get("encoding_suggested"),
"delim": profile_hints.get("delim_suggested"),
"decimal": profile_hints.get("decimal_suggested"),
"skip": profile_hints.get("skip_suggested"),
"warnings": profile_hints.get("warnings") or [],
}


def status(
dataset: str = typer.Option(..., "--dataset", help="Dataset name"),
year: int = typer.Option(..., "--year", help="Dataset year"),
Expand Down Expand Up @@ -46,6 +75,20 @@ def status(
portability = record.get("_portability") or {}
if not portability.get("portable", True):
typer.echo("portable: False")
hints = _raw_hints(Path(cfg.root), dataset, year)
typer.echo("")
typer.echo("raw_hints:")
typer.echo(f" primary_output_file: {hints['primary_output_file']}")
typer.echo(f" suggested_read_exists: {hints['suggested_read_exists']}")
typer.echo(f" suggested_read_path: {hints['suggested_read_path']}")
typer.echo(f" encoding: {hints['encoding']}")
typer.echo(f" delim: {hints['delim']}")
typer.echo(f" decimal: {hints['decimal']}")
typer.echo(f" skip: {hints['skip']}")
if hints["warnings"]:
typer.echo(" warnings:")
for warning in hints["warnings"]:
typer.echo(f" - {warning}")
typer.echo("")
typer.echo("layer layer_status validation_passed errors_count warnings_count")
for layer in ("raw", "clean", "mart"):
Expand Down
Loading