From 13d42fd0760bdb5972ed6bb93d3e198da1b4f354 Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Wed, 18 Mar 2026 11:09:55 +0000 Subject: [PATCH 1/4] feat: validate SQL during dry-run --- tests/test_run_dry_run.py | 72 ++++++++++++++++++++++++++ toolkit/cli/cmd_run.py | 9 ++++ toolkit/cli/sql_dry_run.py | 103 +++++++++++++++++++++++++++++++++++++ 3 files changed, 184 insertions(+) create mode 100644 toolkit/cli/sql_dry_run.py diff --git a/tests/test_run_dry_run.py b/tests/test_run_dry_run.py index 64de82d..0e5c3f2 100644 --- a/tests/test_run_dry_run.py +++ b/tests/test_run_dry_run.py @@ -45,6 +45,7 @@ def test_run_dry_run_prints_plan_and_creates_only_run_record(tmp_path: Path) -> assert "Execution Plan" in result.output assert "status: DRY_RUN" in result.output assert "steps: raw, clean, mart" in result.output + assert "sql_validation: OK" in result.output runs_dir = root_dir / "data" / "_runs" / "demo_ds" / "2022" records = list(runs_dir.glob("*.json")) @@ -59,6 +60,77 @@ def test_run_dry_run_prints_plan_and_creates_only_run_record(tmp_path: Path) -> assert not (root_dir / "data" / "mart" / "demo_ds" / "2022").exists() +def test_run_dry_run_fails_on_clean_sql_syntax_error(tmp_path: Path) -> None: + sql_dir = tmp_path / "sql" / "mart" + sql_dir.mkdir(parents=True, exist_ok=True) + (tmp_path / "sql" / "clean.sql").write_text("select from raw_input", encoding="utf-8") + (sql_dir / "mart_example.sql").write_text("select * from clean_input", encoding="utf-8") + + config_path = tmp_path / "dataset.yml" + root_dir = tmp_path / "out" + config_path.write_text( + "\n".join( + [ + f'root: "{root_dir.as_posix()}"', + "dataset:", + ' name: "demo_ds"', + " years: [2022]", + "raw: {}", + "clean:", + ' sql: "sql/clean.sql"', + "mart:", + " tables:", + ' - name: "mart_example"', + ' sql: "sql/mart/mart_example.sql"', + ] + ), + encoding="utf-8", + ) + + runner = CliRunner() + result = runner.invoke(app, ["run", "all", "--config", str(config_path), "--dry-run"]) + + assert result.exit_code != 0 + assert "CLEAN SQL dry-run failed" in str(result.exception) + + +def test_run_dry_run_fails_on_mart_sql_binding_error(tmp_path: Path) -> None: + sql_dir = tmp_path / "sql" / "mart" + sql_dir.mkdir(parents=True, exist_ok=True) + (tmp_path / "sql" / "clean.sql").write_text('select "x" as value from raw_input', encoding="utf-8") + (sql_dir / "mart_example.sql").write_text("select missing_col from clean_input", encoding="utf-8") + + config_path = tmp_path / "dataset.yml" + root_dir = tmp_path / "out" + config_path.write_text( + "\n".join( + [ + f'root: "{root_dir.as_posix()}"', + "dataset:", + ' name: "demo_ds"', + " years: [2022]", + "raw: {}", + "clean:", + ' sql: "sql/clean.sql"', + " read:", + " columns:", + ' x: "VARCHAR"', + "mart:", + " tables:", + ' - name: "mart_example"', + ' sql: "sql/mart/mart_example.sql"', + ] + ), + encoding="utf-8", + ) + + runner = CliRunner() + result = runner.invoke(app, ["run", "all", "--config", str(config_path), "--dry-run"]) + + assert result.exit_code != 0 + assert "MART SQL dry-run failed" in str(result.exception) + + def test_run_year_logs_effective_root_context(tmp_path: Path, caplog) -> None: sql_dir = tmp_path / "sql" / "mart" sql_dir.mkdir(parents=True, exist_ok=True) diff --git a/toolkit/cli/cmd_run.py b/toolkit/cli/cmd_run.py index b3b2768..59c70d3 100644 --- a/toolkit/cli/cmd_run.py +++ b/toolkit/cli/cmd_run.py @@ -5,6 +5,7 @@ import typer from toolkit.cli.common import iter_selected_years, load_cfg_and_logger +from toolkit.cli.sql_dry_run import validate_sql_dry_run from toolkit.clean.run import run_clean from toolkit.clean.validate import run_clean_validation from toolkit.cross.run import run_cross_year @@ -197,6 +198,14 @@ def run_year( if dry_run: context.mark_dry_run() _print_execution_plan(cfg, year, layers_to_run, context, fail_on_error) + try: + validate_sql_dry_run(cfg, year=year, step=step) + except Exception as exc: + context.fail_run(str(exc)) + raise + if any(layer in {"clean", "mart"} for layer in layers_to_run): + typer.echo("sql_validation: OK") + typer.echo("") return context base_logger.info(f"RUN -> step={step} dataset={cfg.dataset} year={year}") diff --git a/toolkit/cli/sql_dry_run.py b/toolkit/cli/sql_dry_run.py new file mode 100644 index 0000000..d67a12d --- /dev/null +++ b/toolkit/cli/sql_dry_run.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +import re +from pathlib import Path +from typing import Any + +import duckdb + +from toolkit.clean.run import _load_clean_sql +from toolkit.core.template import render_template +from toolkit.mart.run import _resolve_sql_path as _resolve_mart_sql_path + +_QUOTED_IDENTIFIER_RE = re.compile(r'"([^"]+)"') + + +def _dedupe_preserve_order(items: list[str]) -> list[str]: + seen: set[str] = set() + result: list[str] = [] + for item in items: + if not item or item in seen: + continue + seen.add(item) + result.append(item) + return result + + +def _placeholder_columns(clean_cfg: dict[str, Any], sql: str) -> list[str]: + columns: list[str] = [] + read_cfg = clean_cfg.get("read") or {} + read_columns = read_cfg.get("columns") or {} + if isinstance(read_columns, dict): + columns.extend(str(name) for name in read_columns.keys()) + + columns.extend(match.group(1) for match in _QUOTED_IDENTIFIER_RE.finditer(sql)) + return _dedupe_preserve_order(columns) + + +def _quoted_identifier(name: str) -> str: + return '"' + name.replace('"', '""') + '"' + + +def _normalize_sql(sql: str) -> str: + return sql.strip().rstrip(";").strip() + + +def _create_placeholder_raw_input(con: duckdb.DuckDBPyConnection, clean_cfg: dict[str, Any], sql: str) -> None: + columns = _placeholder_columns(clean_cfg, sql) + if columns: + projection = ", ".join(f"NULL::VARCHAR AS {_quoted_identifier(name)}" for name in columns) + else: + projection = "NULL::VARCHAR AS __dry_run_placeholder" + con.execute(f"CREATE OR REPLACE VIEW raw_input AS SELECT {projection} LIMIT 0") + + +def _build_clean_preview( + cfg, + *, + year: int, + con: duckdb.DuckDBPyConnection, +) -> None: + clean_sql_path, clean_sql, _ = _load_clean_sql( + cfg.clean, + dataset=cfg.dataset, + year=year, + base_dir=cfg.base_dir, + ) + clean_sql = _normalize_sql(clean_sql) + _create_placeholder_raw_input(con, cfg.clean, clean_sql) + try: + con.execute(f"CREATE OR REPLACE TABLE __dry_run_clean_preview AS SELECT * FROM ({clean_sql}) AS q LIMIT 0") + except Exception as exc: + raise ValueError(f"CLEAN SQL dry-run failed ({clean_sql_path}): {exc}") from exc + + +def _validate_mart_sql(cfg, *, year: int, con: duckdb.DuckDBPyConnection) -> None: + con.execute("CREATE OR REPLACE VIEW clean_input AS SELECT * FROM __dry_run_clean_preview") + con.execute("CREATE OR REPLACE VIEW clean AS SELECT * FROM clean_input") + + tables = cfg.mart.get("tables") or [] + template_ctx = {"year": year, "dataset": cfg.dataset} + + for table in tables: + name = table.get("name") + sql_ref = table.get("sql") + sql_path = _resolve_mart_sql_path(sql_ref, base_dir=cfg.base_dir) + sql = _normalize_sql(render_template(sql_path.read_text(encoding="utf-8"), template_ctx)) + try: + con.execute(f"EXPLAIN SELECT * FROM ({sql}) AS q LIMIT 0") + except Exception as exc: + raise ValueError(f"MART SQL dry-run failed ({name}, {sql_path}): {exc}") from exc + + +def validate_sql_dry_run(cfg, *, year: int, step: str) -> None: + if step not in {"clean", "mart", "all"}: + return + + con = duckdb.connect(":memory:") + try: + _build_clean_preview(cfg, year=year, con=con) + if step in {"mart", "all"}: + _validate_mart_sql(cfg, year=year, con=con) + finally: + con.close() From 853a14864f9b4686ab9a182860993c18eaf2668e Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Wed, 18 Mar 2026 11:13:45 +0000 Subject: [PATCH 2/4] docs: clarify dry-run SQL validation limits --- toolkit/cli/cmd_run.py | 2 +- toolkit/cli/sql_dry_run.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/toolkit/cli/cmd_run.py b/toolkit/cli/cmd_run.py index 59c70d3..7f76bca 100644 --- a/toolkit/cli/cmd_run.py +++ b/toolkit/cli/cmd_run.py @@ -199,7 +199,7 @@ def run_year( context.mark_dry_run() _print_execution_plan(cfg, year, layers_to_run, context, fail_on_error) try: - validate_sql_dry_run(cfg, year=year, step=step) + validate_sql_dry_run(cfg, year=year, layers=layers_to_run) except Exception as exc: context.fail_run(str(exc)) raise diff --git a/toolkit/cli/sql_dry_run.py b/toolkit/cli/sql_dry_run.py index d67a12d..b1522de 100644 --- a/toolkit/cli/sql_dry_run.py +++ b/toolkit/cli/sql_dry_run.py @@ -31,6 +31,10 @@ def _placeholder_columns(clean_cfg: dict[str, Any], sql: str) -> list[str]: if isinstance(read_columns, dict): columns.extend(str(name) for name in read_columns.keys()) + # Fallback minimale: raccoglie identifier quoted dal SQL per costruire un + # raw_input placeholder abbastanza utile nel dry-run. E' deliberatamente + # approssimativo: puo' includere nomi non-colonna e non copre colonne non + # quotate se non sono gia' dichiarate in clean.read.columns. columns.extend(match.group(1) for match in _QUOTED_IDENTIFIER_RE.finditer(sql)) return _dedupe_preserve_order(columns) @@ -90,14 +94,16 @@ def _validate_mart_sql(cfg, *, year: int, con: duckdb.DuckDBPyConnection) -> Non raise ValueError(f"MART SQL dry-run failed ({name}, {sql_path}): {exc}") from exc -def validate_sql_dry_run(cfg, *, year: int, step: str) -> None: - if step not in {"clean", "mart", "all"}: +def validate_sql_dry_run(cfg, *, year: int, layers: list[str]) -> None: + # Oggi il check copre solo CLEAN e MART. cross_year resta fuori perche' + # ha un contratto di input diverso e richiede una validazione dedicata. + if not any(layer in {"clean", "mart"} for layer in layers): return con = duckdb.connect(":memory:") try: _build_clean_preview(cfg, year=year, con=con) - if step in {"mart", "all"}: + if "mart" in layers: _validate_mart_sql(cfg, year=year, con=con) finally: con.close() From 680b6d7671fce832dac0565935f5f51e1d78f2c6 Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Wed, 18 Mar 2026 11:18:13 +0000 Subject: [PATCH 3/4] fix: remove unused dry-run import --- toolkit/cli/sql_dry_run.py | 1 - 1 file changed, 1 deletion(-) diff --git a/toolkit/cli/sql_dry_run.py b/toolkit/cli/sql_dry_run.py index b1522de..819c04f 100644 --- a/toolkit/cli/sql_dry_run.py +++ b/toolkit/cli/sql_dry_run.py @@ -1,7 +1,6 @@ from __future__ import annotations import re -from pathlib import Path from typing import Any import duckdb From 140418a7d66daff7cac10fb736f8da916bf38c21 Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Thu, 19 Mar 2026 11:09:42 +0000 Subject: [PATCH 4/4] fix: relax dry-run placeholder inference --- tests/test_run_dry_run.py | 34 +++++++++++++++++++++++++++++++ toolkit/cli/sql_dry_run.py | 41 +++++++++++++++++++++++++++++++++----- 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/tests/test_run_dry_run.py b/tests/test_run_dry_run.py index 0e5c3f2..18bcf37 100644 --- a/tests/test_run_dry_run.py +++ b/tests/test_run_dry_run.py @@ -131,6 +131,40 @@ def test_run_dry_run_fails_on_mart_sql_binding_error(tmp_path: Path) -> None: assert "MART SQL dry-run failed" in str(result.exception) +def test_run_dry_run_accepts_unquoted_raw_columns_without_read_columns(tmp_path: Path) -> None: + sql_dir = tmp_path / "sql" / "mart" + sql_dir.mkdir(parents=True, exist_ok=True) + (tmp_path / "sql" / "clean.sql").write_text("select x as value from raw_input", encoding="utf-8") + (sql_dir / "mart_example.sql").write_text("select * from clean_input", encoding="utf-8") + + config_path = tmp_path / "dataset.yml" + root_dir = tmp_path / "out" + config_path.write_text( + "\n".join( + [ + f'root: "{root_dir.as_posix()}"', + "dataset:", + ' name: "demo_ds"', + " years: [2022]", + "raw: {}", + "clean:", + ' sql: "sql/clean.sql"', + "mart:", + " tables:", + ' - name: "mart_example"', + ' sql: "sql/mart/mart_example.sql"', + ] + ), + encoding="utf-8", + ) + + runner = CliRunner() + result = runner.invoke(app, ["run", "all", "--config", str(config_path), "--dry-run"]) + + assert result.exit_code == 0 + assert "sql_validation: OK" in result.output + + def test_run_year_logs_effective_root_context(tmp_path: Path, caplog) -> None: sql_dir = tmp_path / "sql" / "mart" sql_dir.mkdir(parents=True, exist_ok=True) diff --git a/toolkit/cli/sql_dry_run.py b/toolkit/cli/sql_dry_run.py index 819c04f..f3c4ef1 100644 --- a/toolkit/cli/sql_dry_run.py +++ b/toolkit/cli/sql_dry_run.py @@ -10,6 +10,7 @@ from toolkit.mart.run import _resolve_sql_path as _resolve_mart_sql_path _QUOTED_IDENTIFIER_RE = re.compile(r'"([^"]+)"') +_BINDER_MISSING_COLUMN_RE = re.compile(r'Referenced column "([^"]+)" not found in FROM clause') def _dedupe_preserve_order(items: list[str]) -> list[str]: @@ -48,6 +49,13 @@ def _normalize_sql(sql: str) -> str: def _create_placeholder_raw_input(con: duckdb.DuckDBPyConnection, clean_cfg: dict[str, Any], sql: str) -> None: columns = _placeholder_columns(clean_cfg, sql) + _create_placeholder_raw_input_with_columns(con, columns) + + +def _create_placeholder_raw_input_with_columns( + con: duckdb.DuckDBPyConnection, + columns: list[str], +) -> None: if columns: projection = ", ".join(f"NULL::VARCHAR AS {_quoted_identifier(name)}" for name in columns) else: @@ -55,6 +63,13 @@ def _create_placeholder_raw_input(con: duckdb.DuckDBPyConnection, clean_cfg: dic con.execute(f"CREATE OR REPLACE VIEW raw_input AS SELECT {projection} LIMIT 0") +def _extract_missing_binder_column(exc: Exception) -> str | None: + match = _BINDER_MISSING_COLUMN_RE.search(str(exc)) + if not match: + return None + return match.group(1) + + def _build_clean_preview( cfg, *, @@ -68,11 +83,27 @@ def _build_clean_preview( base_dir=cfg.base_dir, ) clean_sql = _normalize_sql(clean_sql) - _create_placeholder_raw_input(con, cfg.clean, clean_sql) - try: - con.execute(f"CREATE OR REPLACE TABLE __dry_run_clean_preview AS SELECT * FROM ({clean_sql}) AS q LIMIT 0") - except Exception as exc: - raise ValueError(f"CLEAN SQL dry-run failed ({clean_sql_path}): {exc}") from exc + columns = _placeholder_columns(cfg.clean, clean_sql) + + # Fallback incrementale: se il clean.sql usa colonne raw non quotate e non + # dichiarate in clean.read.columns, il binder di DuckDB ci dice il nome + # mancante. Lo aggiungiamo al placeholder e riproviamo, cosi' il dry-run + # evita falsi positivi banali senza provare a parsare SQL completo. + for _ in range(25): + _create_placeholder_raw_input_with_columns(con, columns) + try: + con.execute(f"CREATE OR REPLACE TABLE __dry_run_clean_preview AS SELECT * FROM ({clean_sql}) AS q LIMIT 0") + return + except Exception as exc: + missing = _extract_missing_binder_column(exc) + if missing and missing not in columns: + columns.append(missing) + continue + raise ValueError(f"CLEAN SQL dry-run failed ({clean_sql_path}): {exc}") from exc + + raise ValueError( + f"CLEAN SQL dry-run failed ({clean_sql_path}): exceeded placeholder inference attempts" + ) def _validate_mart_sql(cfg, *, year: int, con: duckdb.DuckDBPyConnection) -> None: