diff --git a/tests/test_run_dry_run.py b/tests/test_run_dry_run.py index 64de82d..18bcf37 100644 --- a/tests/test_run_dry_run.py +++ b/tests/test_run_dry_run.py @@ -45,6 +45,7 @@ def test_run_dry_run_prints_plan_and_creates_only_run_record(tmp_path: Path) -> assert "Execution Plan" in result.output assert "status: DRY_RUN" in result.output assert "steps: raw, clean, mart" in result.output + assert "sql_validation: OK" in result.output runs_dir = root_dir / "data" / "_runs" / "demo_ds" / "2022" records = list(runs_dir.glob("*.json")) @@ -59,6 +60,111 @@ def test_run_dry_run_prints_plan_and_creates_only_run_record(tmp_path: Path) -> assert not (root_dir / "data" / "mart" / "demo_ds" / "2022").exists() +def test_run_dry_run_fails_on_clean_sql_syntax_error(tmp_path: Path) -> None: + sql_dir = tmp_path / "sql" / "mart" + sql_dir.mkdir(parents=True, exist_ok=True) + (tmp_path / "sql" / "clean.sql").write_text("select from raw_input", encoding="utf-8") + (sql_dir / "mart_example.sql").write_text("select * from clean_input", encoding="utf-8") + + config_path = tmp_path / "dataset.yml" + root_dir = tmp_path / "out" + config_path.write_text( + "\n".join( + [ + f'root: "{root_dir.as_posix()}"', + "dataset:", + ' name: "demo_ds"', + " years: [2022]", + "raw: {}", + "clean:", + ' sql: "sql/clean.sql"', + "mart:", + " tables:", + ' - name: "mart_example"', + ' sql: "sql/mart/mart_example.sql"', + ] + ), + encoding="utf-8", + ) + + runner = CliRunner() + result = runner.invoke(app, ["run", "all", "--config", str(config_path), "--dry-run"]) + + assert result.exit_code != 0 + assert "CLEAN SQL dry-run failed" in str(result.exception) + + +def test_run_dry_run_fails_on_mart_sql_binding_error(tmp_path: Path) -> None: + sql_dir = tmp_path / "sql" / "mart" + sql_dir.mkdir(parents=True, exist_ok=True) + (tmp_path / "sql" / "clean.sql").write_text('select "x" as value from raw_input', encoding="utf-8") + (sql_dir / "mart_example.sql").write_text("select missing_col from clean_input", encoding="utf-8") + + config_path = tmp_path / "dataset.yml" + root_dir = tmp_path / "out" + config_path.write_text( + "\n".join( + [ + f'root: "{root_dir.as_posix()}"', + "dataset:", + ' name: "demo_ds"', + " years: [2022]", + "raw: {}", + "clean:", + ' sql: "sql/clean.sql"', + " read:", + " columns:", + ' x: "VARCHAR"', + "mart:", + " tables:", + ' - name: "mart_example"', + ' sql: "sql/mart/mart_example.sql"', + ] + ), + encoding="utf-8", + ) + + runner = CliRunner() + result = runner.invoke(app, ["run", "all", "--config", str(config_path), "--dry-run"]) + + assert result.exit_code != 0 + assert "MART SQL dry-run failed" in str(result.exception) + + +def test_run_dry_run_accepts_unquoted_raw_columns_without_read_columns(tmp_path: Path) -> None: + sql_dir = tmp_path / "sql" / "mart" + sql_dir.mkdir(parents=True, exist_ok=True) + (tmp_path / "sql" / "clean.sql").write_text("select x as value from raw_input", encoding="utf-8") + (sql_dir / "mart_example.sql").write_text("select * from clean_input", encoding="utf-8") + + config_path = tmp_path / "dataset.yml" + root_dir = tmp_path / "out" + config_path.write_text( + "\n".join( + [ + f'root: "{root_dir.as_posix()}"', + "dataset:", + ' name: "demo_ds"', + " years: [2022]", + "raw: {}", + "clean:", + ' sql: "sql/clean.sql"', + "mart:", + " tables:", + ' - name: "mart_example"', + ' sql: "sql/mart/mart_example.sql"', + ] + ), + encoding="utf-8", + ) + + runner = CliRunner() + result = runner.invoke(app, ["run", "all", "--config", str(config_path), "--dry-run"]) + + assert result.exit_code == 0 + assert "sql_validation: OK" in result.output + + def test_run_year_logs_effective_root_context(tmp_path: Path, caplog) -> None: sql_dir = tmp_path / "sql" / "mart" sql_dir.mkdir(parents=True, exist_ok=True) diff --git a/toolkit/cli/cmd_run.py b/toolkit/cli/cmd_run.py index b3b2768..7f76bca 100644 --- a/toolkit/cli/cmd_run.py +++ b/toolkit/cli/cmd_run.py @@ -5,6 +5,7 @@ import typer from toolkit.cli.common import iter_selected_years, load_cfg_and_logger +from toolkit.cli.sql_dry_run import validate_sql_dry_run from toolkit.clean.run import run_clean from toolkit.clean.validate import run_clean_validation from toolkit.cross.run import run_cross_year @@ -197,6 +198,14 @@ def run_year( if dry_run: context.mark_dry_run() _print_execution_plan(cfg, year, layers_to_run, context, fail_on_error) + try: + validate_sql_dry_run(cfg, year=year, layers=layers_to_run) + except Exception as exc: + context.fail_run(str(exc)) + raise + if any(layer in {"clean", "mart"} for layer in layers_to_run): + typer.echo("sql_validation: OK") + typer.echo("") return context base_logger.info(f"RUN -> step={step} dataset={cfg.dataset} year={year}") diff --git a/toolkit/cli/sql_dry_run.py b/toolkit/cli/sql_dry_run.py new file mode 100644 index 0000000..f3c4ef1 --- /dev/null +++ b/toolkit/cli/sql_dry_run.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +import re +from typing import Any + +import duckdb + +from toolkit.clean.run import _load_clean_sql +from toolkit.core.template import render_template +from toolkit.mart.run import _resolve_sql_path as _resolve_mart_sql_path + +_QUOTED_IDENTIFIER_RE = re.compile(r'"([^"]+)"') +_BINDER_MISSING_COLUMN_RE = re.compile(r'Referenced column "([^"]+)" not found in FROM clause') + + +def _dedupe_preserve_order(items: list[str]) -> list[str]: + seen: set[str] = set() + result: list[str] = [] + for item in items: + if not item or item in seen: + continue + seen.add(item) + result.append(item) + return result + + +def _placeholder_columns(clean_cfg: dict[str, Any], sql: str) -> list[str]: + columns: list[str] = [] + read_cfg = clean_cfg.get("read") or {} + read_columns = read_cfg.get("columns") or {} + if isinstance(read_columns, dict): + columns.extend(str(name) for name in read_columns.keys()) + + # Fallback minimale: raccoglie identifier quoted dal SQL per costruire un + # raw_input placeholder abbastanza utile nel dry-run. E' deliberatamente + # approssimativo: puo' includere nomi non-colonna e non copre colonne non + # quotate se non sono gia' dichiarate in clean.read.columns. + columns.extend(match.group(1) for match in _QUOTED_IDENTIFIER_RE.finditer(sql)) + return _dedupe_preserve_order(columns) + + +def _quoted_identifier(name: str) -> str: + return '"' + name.replace('"', '""') + '"' + + +def _normalize_sql(sql: str) -> str: + return sql.strip().rstrip(";").strip() + + +def _create_placeholder_raw_input(con: duckdb.DuckDBPyConnection, clean_cfg: dict[str, Any], sql: str) -> None: + columns = _placeholder_columns(clean_cfg, sql) + _create_placeholder_raw_input_with_columns(con, columns) + + +def _create_placeholder_raw_input_with_columns( + con: duckdb.DuckDBPyConnection, + columns: list[str], +) -> None: + if columns: + projection = ", ".join(f"NULL::VARCHAR AS {_quoted_identifier(name)}" for name in columns) + else: + projection = "NULL::VARCHAR AS __dry_run_placeholder" + con.execute(f"CREATE OR REPLACE VIEW raw_input AS SELECT {projection} LIMIT 0") + + +def _extract_missing_binder_column(exc: Exception) -> str | None: + match = _BINDER_MISSING_COLUMN_RE.search(str(exc)) + if not match: + return None + return match.group(1) + + +def _build_clean_preview( + cfg, + *, + year: int, + con: duckdb.DuckDBPyConnection, +) -> None: + clean_sql_path, clean_sql, _ = _load_clean_sql( + cfg.clean, + dataset=cfg.dataset, + year=year, + base_dir=cfg.base_dir, + ) + clean_sql = _normalize_sql(clean_sql) + columns = _placeholder_columns(cfg.clean, clean_sql) + + # Fallback incrementale: se il clean.sql usa colonne raw non quotate e non + # dichiarate in clean.read.columns, il binder di DuckDB ci dice il nome + # mancante. Lo aggiungiamo al placeholder e riproviamo, cosi' il dry-run + # evita falsi positivi banali senza provare a parsare SQL completo. + for _ in range(25): + _create_placeholder_raw_input_with_columns(con, columns) + try: + con.execute(f"CREATE OR REPLACE TABLE __dry_run_clean_preview AS SELECT * FROM ({clean_sql}) AS q LIMIT 0") + return + except Exception as exc: + missing = _extract_missing_binder_column(exc) + if missing and missing not in columns: + columns.append(missing) + continue + raise ValueError(f"CLEAN SQL dry-run failed ({clean_sql_path}): {exc}") from exc + + raise ValueError( + f"CLEAN SQL dry-run failed ({clean_sql_path}): exceeded placeholder inference attempts" + ) + + +def _validate_mart_sql(cfg, *, year: int, con: duckdb.DuckDBPyConnection) -> None: + con.execute("CREATE OR REPLACE VIEW clean_input AS SELECT * FROM __dry_run_clean_preview") + con.execute("CREATE OR REPLACE VIEW clean AS SELECT * FROM clean_input") + + tables = cfg.mart.get("tables") or [] + template_ctx = {"year": year, "dataset": cfg.dataset} + + for table in tables: + name = table.get("name") + sql_ref = table.get("sql") + sql_path = _resolve_mart_sql_path(sql_ref, base_dir=cfg.base_dir) + sql = _normalize_sql(render_template(sql_path.read_text(encoding="utf-8"), template_ctx)) + try: + con.execute(f"EXPLAIN SELECT * FROM ({sql}) AS q LIMIT 0") + except Exception as exc: + raise ValueError(f"MART SQL dry-run failed ({name}, {sql_path}): {exc}") from exc + + +def validate_sql_dry_run(cfg, *, year: int, layers: list[str]) -> None: + # Oggi il check copre solo CLEAN e MART. cross_year resta fuori perche' + # ha un contratto di input diverso e richiede una validazione dedicata. + if not any(layer in {"clean", "mart"} for layer in layers): + return + + con = duckdb.connect(":memory:") + try: + _build_clean_preview(cfg, year=year, con=con) + if "mart" in layers: + _validate_mart_sql(cfg, year=year, con=con) + finally: + con.close()