diff --git a/docs/config-schema.md b/docs/config-schema.md index 9ea11db..112a123 100644 --- a/docs/config-schema.md +++ b/docs/config-schema.md @@ -14,6 +14,7 @@ I path relativi sono sempre risolti rispetto alla directory che contiene `datase | `raw` | `object` | no | configurazione acquisizione RAW | | `clean` | `object` | no | configurazione CLEAN | | `mart` | `object` | no | configurazione MART | +| `cross_year` | `object` | no | output opzionali multi-anno | | `config` | `object` | no | policy parser config | | `validation` | `object` | no | solo opzioni globali del validation gate | | `output` | `object` | no | policy artefatti | @@ -155,6 +156,37 @@ Note pratiche: | `ranges` | `dict[str, RangeRule]` | `{}` | | `min_rows` | `int \| null` | `null` | +## cross_year + +`cross_year` definisce output opzionali multi-anno. Non entra nel loop annuale di `raw/clean/mart`. + +L'esecuzione e esplicita: + +```bash +py -m toolkit.cli.app run cross_year --config dataset.yml +``` + +Campi supportati: + +| Campo | Tipo | Default | +|---|---|---| +| `cross_year.tables` | `list[CrossYearTable]` | `[]` | + +`CrossYearTable`: + +| Campo | Tipo | Default | +|---|---|---| +| `name` | `string` | nessuno | +| `sql` | `string` | nessuno | +| `source_layer` | `clean \| mart` | `clean` | +| `source_table` | `string \| null` | `null` | + +Note pratiche: + +- con `source_layer: clean`, il runner unisce tutti i parquet annuali del layer CLEAN e li espone come view `clean_input` e `clean` +- con `source_layer: mart`, `source_table` e obbligatorio; il runner legge `/.parquet` e lo espone come view `mart_input` e `mart` +- gli output vengono scritti in `root/data/cross//` + ## validation Campi supportati: @@ -198,6 +230,7 @@ Con `config.strict: true` o `--strict-config`, gli stessi casi diventano errori. | `DCL006` | `clean.sql_path` | `clean.sql` | ignored | | `DCL007` | `mart.sql_dir` | `mart.tables[].sql` | ignored | | `DCL008` | `bq` | rimuovere il campo | ignored | +| `DCL013` | `cross_year.* unknown keys` | rimuovere il campo | ignored | ## Esempi minimi @@ -254,6 +287,22 @@ mart: min_rows: 1 ``` +### CROSS_YEAR + +Presuppone che i layer annuali richiesti esistano gia sotto `root/data/clean/...` oppure `root/data/mart/...`. + +```yaml +dataset: + name: cross_demo + years: [2022, 2023] + +cross_year: + tables: + - name: clean_union + sql: sql/cross/clean_union.sql + source_layer: clean +``` + ## Errori config: come leggerli Il parser restituisce errori con path del campo e messaggio. diff --git a/tests/test_config.py b/tests/test_config.py index dfeadc6..a4e79c0 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -54,6 +54,7 @@ def test_load_config_resolves_relative_paths_from_dataset_dir(tmp_path: Path): project_dir = tmp_path / "project" project_dir.mkdir() (project_dir / "sql" / "mart").mkdir(parents=True) + (project_dir / "sql" / "cross").mkdir(parents=True) yml = project_dir / "dataset.yml" yml.write_text( @@ -73,6 +74,11 @@ def test_load_config_resolves_relative_paths_from_dataset_dir(tmp_path: Path): tables: - name: demo_mart sql: "sql/mart/demo.sql" +cross_year: + tables: + - name: demo_cross + sql: "sql/cross/demo_cross.sql" + source_layer: clean """.strip(), encoding="utf-8", ) @@ -85,6 +91,7 @@ def test_load_config_resolves_relative_paths_from_dataset_dir(tmp_path: Path): assert cfg.raw["source"]["args"]["path"] == (project_dir / "data" / "raw.csv").resolve() assert cfg.clean["sql"] == (project_dir / "sql" / "clean.sql").resolve() assert cfg.mart["tables"][0]["sql"] == (project_dir / "sql" / "mart" / "demo.sql").resolve() + assert cfg.cross_year["tables"][0]["sql"] == (project_dir / "sql" / "cross" / "demo_cross.sql").resolve() def test_load_config_does_not_transform_non_whitelisted_path_like_fields(tmp_path: Path): diff --git a/tests/test_cross_year.py b/tests/test_cross_year.py new file mode 100644 index 0000000..b42442c --- /dev/null +++ b/tests/test_cross_year.py @@ -0,0 +1,47 @@ +from pathlib import Path +import shutil + +from toolkit.cli.cmd_run import run as run_cmd + + +def test_cli_run_cross_year_on_project_example(tmp_path: Path, monkeypatch) -> None: + src = Path("project-example") + dst = tmp_path / "project-example" + shutil.copytree(src, dst) + + config_path = dst / "dataset.yml" + cross_sql_dir = dst / "sql" / "cross" + cross_sql_dir.mkdir(parents=True, exist_ok=True) + (cross_sql_dir / "clean_union.sql").write_text( + "\n".join( + [ + "select", + " count(*) as rows_total,", + " count(distinct anno) as anni_distinti", + "from clean_input", + ] + ), + encoding="utf-8", + ) + + config_text = config_path.read_text(encoding="utf-8") + config_text = config_text.replace("years: [2022]", "years: [2022, 2023]") + config_text += ( + "\n" + "cross_year:\n" + " tables:\n" + ' - name: "clean_union"\n' + ' sql: "sql/cross/clean_union.sql"\n' + ' source_layer: "clean"\n' + ) + config_path.write_text(config_text, encoding="utf-8") + + monkeypatch.chdir(dst) + + run_cmd(step="all", config=str(config_path)) + run_cmd(step="cross_year", config=str(config_path)) + + cross_dir = dst / "_smoke_out" / "data" / "cross" / "project_example" + assert (cross_dir / "clean_union.parquet").exists() + assert (cross_dir / "metadata.json").exists() + assert (cross_dir / "manifest.json").exists() diff --git a/toolkit/cli/cmd_run.py b/toolkit/cli/cmd_run.py index 0757acd..55a9f0d 100644 --- a/toolkit/cli/cmd_run.py +++ b/toolkit/cli/cmd_run.py @@ -7,8 +7,9 @@ from toolkit.cli.common import iter_years, load_cfg_and_logger from toolkit.clean.run import run_clean from toolkit.clean.validate import run_clean_validation +from toolkit.cross.run import run_cross_year from toolkit.core.logging import bind_logger, get_logger -from toolkit.core.paths import layer_year_dir +from toolkit.core.paths import layer_dataset_dir, layer_year_dir from toolkit.core.run_context import RunContext from toolkit.mart.run import run_mart from toolkit.mart.validate import run_mart_validation @@ -33,6 +34,8 @@ def _validation_runner(layer_name: str): def _planned_layers(step: str) -> list[str]: if step == "all": return ["raw", "clean", "mart"] + if step == "cross_year": + return ["cross_year"] return [step] @@ -64,6 +67,19 @@ def _validate_execution_plan(cfg, step: str) -> list[str]: if not sql_path.exists(): raise FileNotFoundError(f"MART SQL file not found: {sql_path}") + if "cross_year" in layers: + tables = cfg.cross_year.get("tables") or [] + if not isinstance(tables, list) or not tables: + raise ValueError("cross_year.tables missing or empty in dataset.yml") + for table in tables: + if not isinstance(table, dict): + raise ValueError("Each entry in cross_year.tables must be a mapping (dict).") + sql_path = _resolve_sql_path(cfg, table.get("sql")) + if not sql_path.exists(): + raise FileNotFoundError(f"CROSS_YEAR SQL file not found: {sql_path}") + if table.get("source_layer", "clean") == "mart" and not table.get("source_table"): + raise ValueError("cross_year.tables[].source_table is required when source_layer = mart") + return layers @@ -89,10 +105,55 @@ def _print_execution_plan(cfg, year: int, layers: list[str], context: RunContext typer.echo(f"run_record: {context.path}") typer.echo("output_dirs:") for layer in layers: - typer.echo(f" - {layer}: {layer_year_dir(cfg.root, layer, cfg.dataset, year)}") + if layer == "cross_year": + typer.echo(f" - {layer}: {layer_dataset_dir(cfg.root, 'cross', cfg.dataset)}") + else: + typer.echo(f" - {layer}: {layer_year_dir(cfg.root, layer, cfg.dataset, year)}") typer.echo("") +def run_cross_year_step( + cfg, + *, + dry_run: bool = False, + logger=None, +) -> None: + if logger is None: + logger = get_logger() + + _validate_execution_plan(cfg, "cross_year") + output_dir = layer_dataset_dir(cfg.root, "cross", cfg.dataset) + + if dry_run: + typer.echo("Execution Plan") + typer.echo(f"dataset: {cfg.dataset}") + typer.echo("scope: cross_year") + typer.echo("status: DRY_RUN") + typer.echo(f"years: {', '.join(str(year) for year in cfg.years)}") + typer.echo("steps: cross_year") + typer.echo(f"output_dir: {output_dir}") + typer.echo("") + return + + logger.info( + "RUN cross_year | dataset=%s years=%s base_dir=%s effective_root=%s root_source=%s", + cfg.dataset, + ",".join(str(year) for year in cfg.years), + cfg.base_dir, + cfg.root, + cfg.root_source, + ) + run_cross_year( + cfg.dataset, + cfg.years, + cfg.root, + cfg.cross_year, + logger, + base_dir=cfg.base_dir, + output_cfg=cfg.output, + ) + + def run_year( cfg, year: int, @@ -199,7 +260,7 @@ def _execute_layer(layer_name: str, target, *args, **kwargs) -> None: def run( - step: str = typer.Argument(..., help="raw | clean | mart | all"), + step: str = typer.Argument(..., help="raw | clean | mart | cross_year | all"), config: str = typer.Option(..., "--config", "-c", help="Path to dataset.yml"), dry_run: bool = typer.Option(False, "--dry-run", help="Print execution plan without executing"), strict_config: bool = typer.Option(False, "--strict-config", help="Treat deprecated config forms as errors"), @@ -211,8 +272,12 @@ def run( cfg, logger = load_cfg_and_logger(config, strict_config=strict_config_flag) dry_run_flag = dry_run if isinstance(dry_run, bool) else False - if step not in {"raw", "clean", "mart", "all"}: - raise typer.BadParameter("step must be one of: raw, clean, mart, all") + if step not in {"raw", "clean", "mart", "cross_year", "all"}: + raise typer.BadParameter("step must be one of: raw, clean, mart, cross_year, all") + + if step == "cross_year": + run_cross_year_step(cfg, dry_run=dry_run_flag, logger=logger) + return for year in iter_years(cfg, None): run_year(cfg, year, step=step, dry_run=dry_run_flag, logger=logger) diff --git a/toolkit/core/config.py b/toolkit/core/config.py index 5db6b99..106712d 100644 --- a/toolkit/core/config.py +++ b/toolkit/core/config.py @@ -23,6 +23,7 @@ class ToolkitConfig: raw: dict[str, Any] clean: dict[str, Any] mart: dict[str, Any] + cross_year: dict[str, Any] config: dict[str, Any] validation: dict[str, Any] output: dict[str, Any] @@ -70,6 +71,14 @@ def _compat_mart(model: ToolkitConfigModel) -> dict[str, Any]: ) +def _compat_cross_year(model: ToolkitConfigModel) -> dict[str, Any]: + return model.cross_year.model_dump( + mode="python", + exclude_none=True, + exclude_unset=True, + ) + + def load_config(path: str | Path, *, strict_config: bool = False) -> ToolkitConfig: model = load_config_model(path, strict_config=strict_config) return ToolkitConfig( @@ -82,6 +91,7 @@ def load_config(path: str | Path, *, strict_config: bool = False) -> ToolkitConf raw=_compat_raw(model), clean=_compat_clean(model), mart=_compat_mart(model), + cross_year=_compat_cross_year(model), config=model.config.model_dump(mode="python"), validation=model.validation.model_dump(mode="python"), output=model.output.model_dump(mode="python"), diff --git a/toolkit/core/config_models.py b/toolkit/core/config_models.py index 601a5ca..e964347 100644 --- a/toolkit/core/config_models.py +++ b/toolkit/core/config_models.py @@ -110,6 +110,13 @@ class ConfigDeprecation: status="ignored", message="unknown mart config keys detected", ), + "unknown.cross_year": ConfigDeprecation( + code="DCL013", + legacy="cross_year.* unknown keys", + replacement="remove unsupported cross_year keys", + status="ignored", + message="unknown cross_year config keys detected", + ), } @@ -369,6 +376,21 @@ class MartTableConfig(BaseModel): sql: Path +class CrossYearTableConfig(BaseModel): + model_config = ConfigDict(extra="forbid") + + name: str + sql: Path + source_layer: Literal["clean", "mart"] = "clean" + source_table: str | None = None + + +class CrossYearConfig(BaseModel): + model_config = ConfigDict(extra="forbid") + + tables: list[CrossYearTableConfig] = Field(default_factory=list) + + class MartTableRuleConfig(BaseModel): model_config = ConfigDict(extra="forbid") @@ -440,6 +462,7 @@ class ToolkitConfigModel(BaseModel): raw: RawConfig = Field(default_factory=RawConfig) clean: CleanConfig = Field(default_factory=CleanConfig) mart: MartConfig = Field(default_factory=MartConfig) + cross_year: CrossYearConfig = Field(default_factory=CrossYearConfig) config: ConfigPolicy = Field(default_factory=ConfigPolicy) validation: GlobalValidationConfig = Field(default_factory=GlobalValidationConfig) output: OutputConfig = Field(default_factory=OutputConfig) @@ -489,6 +512,9 @@ def _resolve_path_value(value: Any, *, base_dir: Path) -> Any: ("sql_dir",), ("tables", "*", "sql"), ), + "cross_year": ( + ("tables", "*", "sql"), + ), } @@ -675,6 +701,7 @@ def _declared_model_keys(model_cls: type[BaseModel]) -> set[str]: "raw", "clean", "mart", + "cross_year", "config", "validation", "output", @@ -683,6 +710,7 @@ def _declared_model_keys(model_cls: type[BaseModel]) -> set[str]: _RAW_ALLOWED_KEYS = _declared_model_keys(RawConfig) _CLEAN_ALLOWED_KEYS = _declared_model_keys(CleanConfig) | {"sql_path"} _MART_ALLOWED_KEYS = _declared_model_keys(MartConfig) | {"sql_dir"} +_CROSS_YEAR_ALLOWED_KEYS = _declared_model_keys(CrossYearConfig) def _normalize_legacy_clean_read( @@ -794,6 +822,7 @@ def _warn_or_reject_unknown_keys( ("raw", _RAW_ALLOWED_KEYS, "unknown.raw"), ("clean", _CLEAN_ALLOWED_KEYS, "unknown.clean"), ("mart", _MART_ALLOWED_KEYS, "unknown.mart"), + ("cross_year", _CROSS_YEAR_ALLOWED_KEYS, "unknown.cross_year"), ): section = normalized.get(section_name) if not isinstance(section, dict): @@ -855,6 +884,7 @@ def load_config_model(path: str | Path, *, strict_config: bool = False) -> Toolk raw = normalized.get("raw", {}) or {} clean = normalized.get("clean", {}) or {} mart = normalized.get("mart", {}) or {} + cross_year = normalized.get("cross_year", {}) or {} normalized_fields: list[tuple[str, Path]] = [] if isinstance(raw, dict): @@ -866,6 +896,9 @@ def load_config_model(path: str | Path, *, strict_config: bool = False) -> Toolk if isinstance(mart, dict): mart, mart_changes = _normalize_section_paths("mart", mart, base_dir=base_dir) normalized_fields.extend(mart_changes) + if isinstance(cross_year, dict): + cross_year, cross_year_changes = _normalize_section_paths("cross_year", cross_year, base_dir=base_dir) + normalized_fields.extend(cross_year_changes) normalized_fields.append(("root", root_path)) if normalized_fields: @@ -880,6 +913,7 @@ def load_config_model(path: str | Path, *, strict_config: bool = False) -> Toolk "raw": raw, "clean": clean, "mart": mart, + "cross_year": cross_year, } try: diff --git a/toolkit/core/metadata.py b/toolkit/core/metadata.py index 84a78d1..5f49912 100644 --- a/toolkit/core/metadata.py +++ b/toolkit/core/metadata.py @@ -64,7 +64,7 @@ def write_layer_manifest( folder: Path, *, metadata_path: str, - validation_path: str, + validation_path: str | None, outputs: list[dict[str, Any]], ok: bool | None, errors_count: int | None, diff --git a/toolkit/core/paths.py b/toolkit/core/paths.py index 80705e0..b9dcd5f 100644 --- a/toolkit/core/paths.py +++ b/toolkit/core/paths.py @@ -37,6 +37,10 @@ def layer_year_dir(root: str | os.PathLike[str], layer: str, dataset: str, year: return dataset_dir(root, layer, dataset) / str(year) +def layer_dataset_dir(root: str | os.PathLike[str], layer: str, dataset: str) -> Path: + return dataset_dir(root, layer, dataset) + + def run_dir(root: str | os.PathLike[str], layer: str, dataset: str, year: int | str, run_id: str) -> Path: # Esempio: data/clean/ispra/2022/_runs/20260223T221500Z_x9a2f return layer_year_dir(root, layer, dataset, year) / "_runs" / run_id diff --git a/toolkit/cross/__init__.py b/toolkit/cross/__init__.py new file mode 100644 index 0000000..9d48db4 --- /dev/null +++ b/toolkit/cross/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/toolkit/cross/run.py b/toolkit/cross/run.py new file mode 100644 index 0000000..64ebc7b --- /dev/null +++ b/toolkit/cross/run.py @@ -0,0 +1,197 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import duckdb + +from toolkit.core.artifacts import ARTIFACT_POLICY_DEBUG, resolve_artifact_policy, should_write +from toolkit.core.metadata import file_record, sha256_bytes, write_layer_manifest, write_metadata +from toolkit.core.paths import layer_dataset_dir, layer_year_dir, resolve_root, to_root_relative +from toolkit.core.template import render_template + + +def _serialize_metadata_path(path: Path | None, rel_root: Path | None) -> str | None: + if path is None: + return None + if rel_root is None: + return path.as_posix() + return to_root_relative(path, rel_root) + + +def _resolve_sql_path(sql_ref: str | Path, *, base_dir: Path | None) -> Path: + path = Path(sql_ref) + if path.is_absolute(): + return path + if base_dir is None: + return path + return base_dir / path + + +def _config_hash(base_dir: Path | None) -> str | None: + if base_dir is None: + return None + path = Path(base_dir) / "dataset.yml" + if not path.exists(): + return None + return sha256_bytes(path.read_bytes()) + + +def _source_files(root: str | None, dataset: str, years: list[int], table_cfg: dict[str, Any]) -> list[Path]: + source_layer = table_cfg.get("source_layer", "clean") + source_table = table_cfg.get("source_table") + files: list[Path] = [] + + if source_layer == "clean": + for year in years: + clean_dir = layer_year_dir(root, "clean", dataset, year) + if not clean_dir.exists(): + raise FileNotFoundError(f"CLEAN dir not found: {clean_dir}. Run: toolkit run clean -c dataset.yml") + year_files = sorted(clean_dir.glob("*.parquet")) + if not year_files: + raise FileNotFoundError(f"No CLEAN parquet found in {clean_dir}") + files.extend(year_files) + return files + + if source_layer == "mart": + if not source_table: + raise ValueError("cross_year.tables[].source_table is required when source_layer = mart") + for year in years: + mart_file = layer_year_dir(root, "mart", dataset, year) / f"{source_table}.parquet" + if not mart_file.exists(): + raise FileNotFoundError( + f"MART parquet not found: {mart_file}. Run: toolkit run mart -c dataset.yml" + ) + files.append(mart_file) + return files + + raise ValueError(f"Unsupported cross_year source_layer: {source_layer}") + + +def _bind_source_view(con: duckdb.DuckDBPyConnection, files: list[Path], source_layer: str) -> None: + if len(files) == 1: + source_expr = f"read_parquet('{files[0]}')" + else: + paths = ",".join(f"'{path}'" for path in files) + source_expr = f"read_parquet([{paths}])" + + con.execute(f"CREATE OR REPLACE VIEW source_input AS SELECT * FROM {source_expr}") + con.execute(f"CREATE OR REPLACE VIEW {source_layer}_input AS SELECT * FROM source_input") + con.execute(f"CREATE OR REPLACE VIEW {source_layer} AS SELECT * FROM source_input") + con.execute(f"CREATE OR REPLACE VIEW {source_layer}_all_years AS SELECT * FROM source_input") + + +def run_cross_year( + dataset: str, + years: list[int], + root: str | None, + cross_year_cfg: dict[str, Any], + logger, + *, + base_dir: Path | None = None, + output_cfg: dict[str, Any] | None = None, +) -> None: + policy = resolve_artifact_policy(output_cfg) + root_dir = resolve_root(root) + cross_dir = layer_dataset_dir(root, "cross", dataset) + cross_dir.mkdir(parents=True, exist_ok=True) + + tables = cross_year_cfg.get("tables") or [] + if not isinstance(tables, list) or not tables: + raise ValueError("cross_year.tables missing or empty in dataset.yml") + + con = duckdb.connect(":memory:") + try: + template_ctx = { + "dataset": dataset, + "years": ",".join(str(year) for year in years), + "years_csv": ",".join(str(year) for year in years), + } + + run_dir: Path | None = None + if should_write("mart", "rendered_sql", policy, {"output": output_cfg or {}}): + run_dir = cross_dir / "_run" + run_dir.mkdir(parents=True, exist_ok=True) + + written: list[Path] = [] + executed: list[dict[str, Any]] = [] + debug_tables: list[dict[str, Any]] = [] + + for i, table in enumerate(tables, start=1): + if not isinstance(table, dict): + raise ValueError("Each entry in cross_year.tables must be a mapping (dict).") + + name = table.get("name") + sql_rel = table.get("sql") + source_layer = table.get("source_layer", "clean") + if not name or not sql_rel: + raise ValueError("Each cross_year.tables entry must include: name, sql") + + files = _source_files(root, dataset, years, table) + _bind_source_view(con, files, source_layer) + + sql_path = _resolve_sql_path(sql_rel, base_dir=base_dir) + if not sql_path.exists(): + raise FileNotFoundError(f"CROSS_YEAR SQL file not found: {sql_path}") + + sql = render_template(sql_path.read_text(encoding="utf-8"), template_ctx) + + rendered_sql_path: Path | None = None + if run_dir is not None: + rendered_sql_path = run_dir / f"{i:02d}_{name}_rendered.sql" + rendered_sql_path.write_text(sql, encoding="utf-8") + + con.execute(f"CREATE OR REPLACE TABLE {name} AS {sql}") + out = cross_dir / f"{name}.parquet" + con.execute(f"COPY {name} TO '{out}' (FORMAT PARQUET);") + + written.append(out) + executed.append( + { + "name": name, + "sql": _serialize_metadata_path(sql_path, base_dir), + "sql_rendered": _serialize_metadata_path(rendered_sql_path, root_dir), + "output": _serialize_metadata_path(out, root_dir), + "source_layer": source_layer, + "source_table": table.get("source_table"), + "source_inputs": [_serialize_metadata_path(path, root_dir) for path in files], + } + ) + if policy == ARTIFACT_POLICY_DEBUG: + debug_tables.append( + { + "name": name, + "sql_absolute": str(sql_path.resolve()), + "sql_rendered_absolute": str(rendered_sql_path.resolve()) if rendered_sql_path else None, + "output_absolute": str(out.resolve()), + } + ) + finally: + con.close() + + outputs = [file_record(path) for path in written] + metadata_payload = { + "layer": "cross", + "dataset": dataset, + "years": years, + "config_hash": _config_hash(base_dir), + "outputs": outputs, + "output_paths": [_serialize_metadata_path(path, root_dir) for path in written], + "tables": executed, + } + if policy == ARTIFACT_POLICY_DEBUG: + metadata_payload["debug"] = { + "output_root_absolute": str(root_dir.resolve()), + "tables": debug_tables, + } + metadata_path = write_metadata(cross_dir, metadata_payload) + write_layer_manifest( + cross_dir, + metadata_path=metadata_path.name, + validation_path=None, + outputs=outputs, + ok=None, + errors_count=None, + warnings_count=None, + ) + logger.info("CROSS_YEAR -> %s", cross_dir)