Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions docs/config-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ I path relativi sono sempre risolti rispetto alla directory che contiene `datase
| `raw` | `object` | no | configurazione acquisizione RAW |
| `clean` | `object` | no | configurazione CLEAN |
| `mart` | `object` | no | configurazione MART |
| `cross_year` | `object` | no | output opzionali multi-anno |
| `config` | `object` | no | policy parser config |
| `validation` | `object` | no | solo opzioni globali del validation gate |
| `output` | `object` | no | policy artefatti |
Expand Down Expand Up @@ -155,6 +156,37 @@ Note pratiche:
| `ranges` | `dict[str, RangeRule]` | `{}` |
| `min_rows` | `int \| null` | `null` |

## cross_year

`cross_year` definisce output opzionali multi-anno. Non entra nel loop annuale di `raw/clean/mart`.

L'esecuzione e esplicita:

```bash
py -m toolkit.cli.app run cross_year --config dataset.yml
```

Campi supportati:

| Campo | Tipo | Default |
|---|---|---|
| `cross_year.tables` | `list[CrossYearTable]` | `[]` |

`CrossYearTable`:

| Campo | Tipo | Default |
|---|---|---|
| `name` | `string` | nessuno |
| `sql` | `string` | nessuno |
| `source_layer` | `clean \| mart` | `clean` |
| `source_table` | `string \| null` | `null` |

Note pratiche:

- con `source_layer: clean`, il runner unisce tutti i parquet annuali del layer CLEAN e li espone come view `clean_input` e `clean`
- con `source_layer: mart`, `source_table` e obbligatorio; il runner legge `<year>/<source_table>.parquet` e lo espone come view `mart_input` e `mart`
- gli output vengono scritti in `root/data/cross/<dataset>/`

## validation

Campi supportati:
Expand Down Expand Up @@ -198,6 +230,7 @@ Con `config.strict: true` o `--strict-config`, gli stessi casi diventano errori.
| `DCL006` | `clean.sql_path` | `clean.sql` | ignored |
| `DCL007` | `mart.sql_dir` | `mart.tables[].sql` | ignored |
| `DCL008` | `bq` | rimuovere il campo | ignored |
| `DCL013` | `cross_year.* unknown keys` | rimuovere il campo | ignored |

## Esempi minimi

Expand Down Expand Up @@ -254,6 +287,22 @@ mart:
min_rows: 1
```

### CROSS_YEAR

Presuppone che i layer annuali richiesti esistano gia sotto `root/data/clean/...` oppure `root/data/mart/...`.

```yaml
dataset:
name: cross_demo
years: [2022, 2023]

cross_year:
tables:
- name: clean_union
sql: sql/cross/clean_union.sql
source_layer: clean
```

## Errori config: come leggerli

Il parser restituisce errori con path del campo e messaggio.
Expand Down
7 changes: 7 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def test_load_config_resolves_relative_paths_from_dataset_dir(tmp_path: Path):
project_dir = tmp_path / "project"
project_dir.mkdir()
(project_dir / "sql" / "mart").mkdir(parents=True)
(project_dir / "sql" / "cross").mkdir(parents=True)

yml = project_dir / "dataset.yml"
yml.write_text(
Expand All @@ -73,6 +74,11 @@ def test_load_config_resolves_relative_paths_from_dataset_dir(tmp_path: Path):
tables:
- name: demo_mart
sql: "sql/mart/demo.sql"
cross_year:
tables:
- name: demo_cross
sql: "sql/cross/demo_cross.sql"
source_layer: clean
""".strip(),
encoding="utf-8",
)
Expand All @@ -85,6 +91,7 @@ def test_load_config_resolves_relative_paths_from_dataset_dir(tmp_path: Path):
assert cfg.raw["source"]["args"]["path"] == (project_dir / "data" / "raw.csv").resolve()
assert cfg.clean["sql"] == (project_dir / "sql" / "clean.sql").resolve()
assert cfg.mart["tables"][0]["sql"] == (project_dir / "sql" / "mart" / "demo.sql").resolve()
assert cfg.cross_year["tables"][0]["sql"] == (project_dir / "sql" / "cross" / "demo_cross.sql").resolve()


def test_load_config_does_not_transform_non_whitelisted_path_like_fields(tmp_path: Path):
Expand Down
47 changes: 47 additions & 0 deletions tests/test_cross_year.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from pathlib import Path
import shutil

from toolkit.cli.cmd_run import run as run_cmd


def test_cli_run_cross_year_on_project_example(tmp_path: Path, monkeypatch) -> None:
src = Path("project-example")
dst = tmp_path / "project-example"
shutil.copytree(src, dst)

config_path = dst / "dataset.yml"
cross_sql_dir = dst / "sql" / "cross"
cross_sql_dir.mkdir(parents=True, exist_ok=True)
(cross_sql_dir / "clean_union.sql").write_text(
"\n".join(
[
"select",
" count(*) as rows_total,",
" count(distinct anno) as anni_distinti",
"from clean_input",
]
),
encoding="utf-8",
)

config_text = config_path.read_text(encoding="utf-8")
config_text = config_text.replace("years: [2022]", "years: [2022, 2023]")
config_text += (
"\n"
"cross_year:\n"
" tables:\n"
' - name: "clean_union"\n'
' sql: "sql/cross/clean_union.sql"\n'
' source_layer: "clean"\n'
)
config_path.write_text(config_text, encoding="utf-8")

monkeypatch.chdir(dst)

run_cmd(step="all", config=str(config_path))
run_cmd(step="cross_year", config=str(config_path))

cross_dir = dst / "_smoke_out" / "data" / "cross" / "project_example"
assert (cross_dir / "clean_union.parquet").exists()
assert (cross_dir / "metadata.json").exists()
assert (cross_dir / "manifest.json").exists()
75 changes: 70 additions & 5 deletions toolkit/cli/cmd_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from toolkit.cli.common import iter_years, load_cfg_and_logger
from toolkit.clean.run import run_clean
from toolkit.clean.validate import run_clean_validation
from toolkit.cross.run import run_cross_year
from toolkit.core.logging import bind_logger, get_logger
from toolkit.core.paths import layer_year_dir
from toolkit.core.paths import layer_dataset_dir, layer_year_dir
from toolkit.core.run_context import RunContext
from toolkit.mart.run import run_mart
from toolkit.mart.validate import run_mart_validation
Expand All @@ -33,6 +34,8 @@ def _validation_runner(layer_name: str):
def _planned_layers(step: str) -> list[str]:
if step == "all":
return ["raw", "clean", "mart"]
if step == "cross_year":
return ["cross_year"]
return [step]


Expand Down Expand Up @@ -64,6 +67,19 @@ def _validate_execution_plan(cfg, step: str) -> list[str]:
if not sql_path.exists():
raise FileNotFoundError(f"MART SQL file not found: {sql_path}")

if "cross_year" in layers:
tables = cfg.cross_year.get("tables") or []
if not isinstance(tables, list) or not tables:
raise ValueError("cross_year.tables missing or empty in dataset.yml")
for table in tables:
if not isinstance(table, dict):
raise ValueError("Each entry in cross_year.tables must be a mapping (dict).")
sql_path = _resolve_sql_path(cfg, table.get("sql"))
if not sql_path.exists():
raise FileNotFoundError(f"CROSS_YEAR SQL file not found: {sql_path}")
if table.get("source_layer", "clean") == "mart" and not table.get("source_table"):
raise ValueError("cross_year.tables[].source_table is required when source_layer = mart")

return layers


Expand All @@ -89,10 +105,55 @@ def _print_execution_plan(cfg, year: int, layers: list[str], context: RunContext
typer.echo(f"run_record: {context.path}")
typer.echo("output_dirs:")
for layer in layers:
typer.echo(f" - {layer}: {layer_year_dir(cfg.root, layer, cfg.dataset, year)}")
if layer == "cross_year":
typer.echo(f" - {layer}: {layer_dataset_dir(cfg.root, 'cross', cfg.dataset)}")
else:
typer.echo(f" - {layer}: {layer_year_dir(cfg.root, layer, cfg.dataset, year)}")
typer.echo("")


def run_cross_year_step(
cfg,
*,
dry_run: bool = False,
logger=None,
) -> None:
if logger is None:
logger = get_logger()

_validate_execution_plan(cfg, "cross_year")
output_dir = layer_dataset_dir(cfg.root, "cross", cfg.dataset)

if dry_run:
typer.echo("Execution Plan")
typer.echo(f"dataset: {cfg.dataset}")
typer.echo("scope: cross_year")
typer.echo("status: DRY_RUN")
typer.echo(f"years: {', '.join(str(year) for year in cfg.years)}")
typer.echo("steps: cross_year")
typer.echo(f"output_dir: {output_dir}")
typer.echo("")
return

logger.info(
"RUN cross_year | dataset=%s years=%s base_dir=%s effective_root=%s root_source=%s",
cfg.dataset,
",".join(str(year) for year in cfg.years),
cfg.base_dir,
cfg.root,
cfg.root_source,
)
run_cross_year(
cfg.dataset,
cfg.years,
cfg.root,
cfg.cross_year,
logger,
base_dir=cfg.base_dir,
output_cfg=cfg.output,
)


def run_year(
cfg,
year: int,
Expand Down Expand Up @@ -199,7 +260,7 @@ def _execute_layer(layer_name: str, target, *args, **kwargs) -> None:


def run(
step: str = typer.Argument(..., help="raw | clean | mart | all"),
step: str = typer.Argument(..., help="raw | clean | mart | cross_year | all"),
config: str = typer.Option(..., "--config", "-c", help="Path to dataset.yml"),
dry_run: bool = typer.Option(False, "--dry-run", help="Print execution plan without executing"),
strict_config: bool = typer.Option(False, "--strict-config", help="Treat deprecated config forms as errors"),
Expand All @@ -211,8 +272,12 @@ def run(
cfg, logger = load_cfg_and_logger(config, strict_config=strict_config_flag)
dry_run_flag = dry_run if isinstance(dry_run, bool) else False

if step not in {"raw", "clean", "mart", "all"}:
raise typer.BadParameter("step must be one of: raw, clean, mart, all")
if step not in {"raw", "clean", "mart", "cross_year", "all"}:
raise typer.BadParameter("step must be one of: raw, clean, mart, cross_year, all")

if step == "cross_year":
run_cross_year_step(cfg, dry_run=dry_run_flag, logger=logger)
return

for year in iter_years(cfg, None):
run_year(cfg, year, step=step, dry_run=dry_run_flag, logger=logger)
Expand Down
10 changes: 10 additions & 0 deletions toolkit/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class ToolkitConfig:
raw: dict[str, Any]
clean: dict[str, Any]
mart: dict[str, Any]
cross_year: dict[str, Any]
config: dict[str, Any]
validation: dict[str, Any]
output: dict[str, Any]
Expand Down Expand Up @@ -70,6 +71,14 @@ def _compat_mart(model: ToolkitConfigModel) -> dict[str, Any]:
)


def _compat_cross_year(model: ToolkitConfigModel) -> dict[str, Any]:
return model.cross_year.model_dump(
mode="python",
exclude_none=True,
exclude_unset=True,
)


def load_config(path: str | Path, *, strict_config: bool = False) -> ToolkitConfig:
model = load_config_model(path, strict_config=strict_config)
return ToolkitConfig(
Expand All @@ -82,6 +91,7 @@ def load_config(path: str | Path, *, strict_config: bool = False) -> ToolkitConf
raw=_compat_raw(model),
clean=_compat_clean(model),
mart=_compat_mart(model),
cross_year=_compat_cross_year(model),
config=model.config.model_dump(mode="python"),
validation=model.validation.model_dump(mode="python"),
output=model.output.model_dump(mode="python"),
Expand Down
Loading