Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions tests/test_run_dry_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def test_run_dry_run_prints_plan_and_creates_only_run_record(tmp_path: Path) ->
assert "Execution Plan" in result.output
assert "status: DRY_RUN" in result.output
assert "steps: raw, clean, mart" in result.output
assert "sql_validation: OK" in result.output

runs_dir = root_dir / "data" / "_runs" / "demo_ds" / "2022"
records = list(runs_dir.glob("*.json"))
Expand All @@ -59,6 +60,111 @@ def test_run_dry_run_prints_plan_and_creates_only_run_record(tmp_path: Path) ->
assert not (root_dir / "data" / "mart" / "demo_ds" / "2022").exists()


def test_run_dry_run_fails_on_clean_sql_syntax_error(tmp_path: Path) -> None:
sql_dir = tmp_path / "sql" / "mart"
sql_dir.mkdir(parents=True, exist_ok=True)
(tmp_path / "sql" / "clean.sql").write_text("select from raw_input", encoding="utf-8")
(sql_dir / "mart_example.sql").write_text("select * from clean_input", encoding="utf-8")

config_path = tmp_path / "dataset.yml"
root_dir = tmp_path / "out"
config_path.write_text(
"\n".join(
[
f'root: "{root_dir.as_posix()}"',
"dataset:",
' name: "demo_ds"',
" years: [2022]",
"raw: {}",
"clean:",
' sql: "sql/clean.sql"',
"mart:",
" tables:",
' - name: "mart_example"',
' sql: "sql/mart/mart_example.sql"',
]
),
encoding="utf-8",
)

runner = CliRunner()
result = runner.invoke(app, ["run", "all", "--config", str(config_path), "--dry-run"])

assert result.exit_code != 0
assert "CLEAN SQL dry-run failed" in str(result.exception)


def test_run_dry_run_fails_on_mart_sql_binding_error(tmp_path: Path) -> None:
sql_dir = tmp_path / "sql" / "mart"
sql_dir.mkdir(parents=True, exist_ok=True)
(tmp_path / "sql" / "clean.sql").write_text('select "x" as value from raw_input', encoding="utf-8")
(sql_dir / "mart_example.sql").write_text("select missing_col from clean_input", encoding="utf-8")

config_path = tmp_path / "dataset.yml"
root_dir = tmp_path / "out"
config_path.write_text(
"\n".join(
[
f'root: "{root_dir.as_posix()}"',
"dataset:",
' name: "demo_ds"',
" years: [2022]",
"raw: {}",
"clean:",
' sql: "sql/clean.sql"',
" read:",
" columns:",
' x: "VARCHAR"',
"mart:",
" tables:",
' - name: "mart_example"',
' sql: "sql/mart/mart_example.sql"',
]
),
encoding="utf-8",
)

runner = CliRunner()
result = runner.invoke(app, ["run", "all", "--config", str(config_path), "--dry-run"])

assert result.exit_code != 0
assert "MART SQL dry-run failed" in str(result.exception)


def test_run_dry_run_accepts_unquoted_raw_columns_without_read_columns(tmp_path: Path) -> None:
sql_dir = tmp_path / "sql" / "mart"
sql_dir.mkdir(parents=True, exist_ok=True)
(tmp_path / "sql" / "clean.sql").write_text("select x as value from raw_input", encoding="utf-8")
(sql_dir / "mart_example.sql").write_text("select * from clean_input", encoding="utf-8")

config_path = tmp_path / "dataset.yml"
root_dir = tmp_path / "out"
config_path.write_text(
"\n".join(
[
f'root: "{root_dir.as_posix()}"',
"dataset:",
' name: "demo_ds"',
" years: [2022]",
"raw: {}",
"clean:",
' sql: "sql/clean.sql"',
"mart:",
" tables:",
' - name: "mart_example"',
' sql: "sql/mart/mart_example.sql"',
]
),
encoding="utf-8",
)

runner = CliRunner()
result = runner.invoke(app, ["run", "all", "--config", str(config_path), "--dry-run"])

assert result.exit_code == 0
assert "sql_validation: OK" in result.output


def test_run_year_logs_effective_root_context(tmp_path: Path, caplog) -> None:
sql_dir = tmp_path / "sql" / "mart"
sql_dir.mkdir(parents=True, exist_ok=True)
Expand Down
9 changes: 9 additions & 0 deletions toolkit/cli/cmd_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import typer

from toolkit.cli.common import iter_selected_years, load_cfg_and_logger
from toolkit.cli.sql_dry_run import validate_sql_dry_run
from toolkit.clean.run import run_clean
from toolkit.clean.validate import run_clean_validation
from toolkit.cross.run import run_cross_year
Expand Down Expand Up @@ -197,6 +198,14 @@ def run_year(
if dry_run:
context.mark_dry_run()
_print_execution_plan(cfg, year, layers_to_run, context, fail_on_error)
try:
validate_sql_dry_run(cfg, year=year, layers=layers_to_run)
except Exception as exc:
context.fail_run(str(exc))
raise
if any(layer in {"clean", "mart"} for layer in layers_to_run):
typer.echo("sql_validation: OK")
typer.echo("")
return context

base_logger.info(f"RUN -> step={step} dataset={cfg.dataset} year={year}")
Expand Down
139 changes: 139 additions & 0 deletions toolkit/cli/sql_dry_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from __future__ import annotations

import re
from typing import Any

import duckdb

from toolkit.clean.run import _load_clean_sql
from toolkit.core.template import render_template
from toolkit.mart.run import _resolve_sql_path as _resolve_mart_sql_path

_QUOTED_IDENTIFIER_RE = re.compile(r'"([^"]+)"')
_BINDER_MISSING_COLUMN_RE = re.compile(r'Referenced column "([^"]+)" not found in FROM clause')


def _dedupe_preserve_order(items: list[str]) -> list[str]:
seen: set[str] = set()
result: list[str] = []
for item in items:
if not item or item in seen:
continue
seen.add(item)
result.append(item)
return result


def _placeholder_columns(clean_cfg: dict[str, Any], sql: str) -> list[str]:
columns: list[str] = []
read_cfg = clean_cfg.get("read") or {}
read_columns = read_cfg.get("columns") or {}
if isinstance(read_columns, dict):
columns.extend(str(name) for name in read_columns.keys())

# Fallback minimale: raccoglie identifier quoted dal SQL per costruire un
# raw_input placeholder abbastanza utile nel dry-run. E' deliberatamente
# approssimativo: puo' includere nomi non-colonna e non copre colonne non
# quotate se non sono gia' dichiarate in clean.read.columns.
columns.extend(match.group(1) for match in _QUOTED_IDENTIFIER_RE.finditer(sql))
return _dedupe_preserve_order(columns)


def _quoted_identifier(name: str) -> str:
return '"' + name.replace('"', '""') + '"'


def _normalize_sql(sql: str) -> str:
return sql.strip().rstrip(";").strip()


def _create_placeholder_raw_input(con: duckdb.DuckDBPyConnection, clean_cfg: dict[str, Any], sql: str) -> None:
columns = _placeholder_columns(clean_cfg, sql)
_create_placeholder_raw_input_with_columns(con, columns)


def _create_placeholder_raw_input_with_columns(
con: duckdb.DuckDBPyConnection,
columns: list[str],
) -> None:
if columns:
projection = ", ".join(f"NULL::VARCHAR AS {_quoted_identifier(name)}" for name in columns)
else:
projection = "NULL::VARCHAR AS __dry_run_placeholder"
con.execute(f"CREATE OR REPLACE VIEW raw_input AS SELECT {projection} LIMIT 0")


def _extract_missing_binder_column(exc: Exception) -> str | None:
match = _BINDER_MISSING_COLUMN_RE.search(str(exc))
if not match:
return None
return match.group(1)


def _build_clean_preview(
cfg,
*,
year: int,
con: duckdb.DuckDBPyConnection,
) -> None:
clean_sql_path, clean_sql, _ = _load_clean_sql(
cfg.clean,
dataset=cfg.dataset,
year=year,
base_dir=cfg.base_dir,
)
clean_sql = _normalize_sql(clean_sql)
columns = _placeholder_columns(cfg.clean, clean_sql)

# Fallback incrementale: se il clean.sql usa colonne raw non quotate e non
# dichiarate in clean.read.columns, il binder di DuckDB ci dice il nome
# mancante. Lo aggiungiamo al placeholder e riproviamo, cosi' il dry-run
# evita falsi positivi banali senza provare a parsare SQL completo.
for _ in range(25):
_create_placeholder_raw_input_with_columns(con, columns)
try:
con.execute(f"CREATE OR REPLACE TABLE __dry_run_clean_preview AS SELECT * FROM ({clean_sql}) AS q LIMIT 0")
return
except Exception as exc:
missing = _extract_missing_binder_column(exc)
if missing and missing not in columns:
columns.append(missing)
continue
raise ValueError(f"CLEAN SQL dry-run failed ({clean_sql_path}): {exc}") from exc

raise ValueError(
f"CLEAN SQL dry-run failed ({clean_sql_path}): exceeded placeholder inference attempts"
)


def _validate_mart_sql(cfg, *, year: int, con: duckdb.DuckDBPyConnection) -> None:
con.execute("CREATE OR REPLACE VIEW clean_input AS SELECT * FROM __dry_run_clean_preview")
con.execute("CREATE OR REPLACE VIEW clean AS SELECT * FROM clean_input")

tables = cfg.mart.get("tables") or []
template_ctx = {"year": year, "dataset": cfg.dataset}

for table in tables:
name = table.get("name")
sql_ref = table.get("sql")
sql_path = _resolve_mart_sql_path(sql_ref, base_dir=cfg.base_dir)
sql = _normalize_sql(render_template(sql_path.read_text(encoding="utf-8"), template_ctx))
try:
con.execute(f"EXPLAIN SELECT * FROM ({sql}) AS q LIMIT 0")
except Exception as exc:
raise ValueError(f"MART SQL dry-run failed ({name}, {sql_path}): {exc}") from exc


def validate_sql_dry_run(cfg, *, year: int, layers: list[str]) -> None:
# Oggi il check copre solo CLEAN e MART. cross_year resta fuori perche'
# ha un contratto di input diverso e richiede una validazione dedicata.
if not any(layer in {"clean", "mart"} for layer in layers):
return

con = duckdb.connect(":memory:")
try:
_build_clean_preview(cfg, year=year, con=con)
if "mart" in layers:
_validate_mart_sql(cfg, year=year, con=con)
finally:
con.close()
Loading