Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions tests/test_artifacts_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,16 @@ def test_artifacts_policy_minimal_skips_optional_outputs(tmp_path: Path, monkeyp
)
profile_cmd(step="raw", config=str(config_path))
run_clean(cfg.dataset, year, cfg.root, cfg.clean, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
run_mart(
cfg.dataset,
year,
cfg.root,
cfg.mart,
logger,
base_dir=cfg.base_dir,
clean_cfg=cfg.clean,
output_cfg=cfg.output,
)
run_clean_validation(cfg, year, logger)
run_mart_validation(cfg, year, logger)

Expand Down Expand Up @@ -119,7 +128,16 @@ def test_artifacts_policy_standard_keeps_current_debug_artifacts(tmp_path: Path,
)
profile_cmd(step="raw", config=str(config_path))
run_clean(cfg.dataset, year, cfg.root, cfg.clean, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
run_mart(
cfg.dataset,
year,
cfg.root,
cfg.mart,
logger,
base_dir=cfg.base_dir,
clean_cfg=cfg.clean,
output_cfg=cfg.output,
)
run_clean_validation(cfg, year, logger)
run_mart_validation(cfg, year, logger)

Expand Down Expand Up @@ -183,7 +201,16 @@ def test_run_mart_supports_root_posix_placeholder(tmp_path: Path) -> None:

cfg = load_config(config_path)
logger = _NoopLogger()
result = run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
result = run_mart(
cfg.dataset,
year,
cfg.root,
cfg.mart,
logger,
base_dir=cfg.base_dir,
clean_cfg=cfg.clean,
output_cfg=cfg.output,
)

mart_output = root_dir / "data" / "mart" / dataset / str(year) / "mart_example.parquet"
assert mart_output.exists()
Expand Down
22 changes: 20 additions & 2 deletions tests/test_project_example_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,16 @@ def test_project_example_golden_path(tmp_path: Path, monkeypatch):
clean_cfg=cfg.clean,
)
run_clean(cfg.dataset, year, cfg.root, cfg.clean, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
run_mart(
cfg.dataset,
year,
cfg.root,
cfg.mart,
logger,
base_dir=cfg.base_dir,
clean_cfg=cfg.clean,
output_cfg=cfg.output,
)
validate_cmd(step="clean", config=str(dst / "dataset.yml"))
validate_cmd(step="mart", config=str(dst / "dataset.yml"))

Expand Down Expand Up @@ -200,7 +209,16 @@ def test_project_example_outputs_can_be_replaced_after_run(tmp_path: Path, monke
clean_cfg=cfg.clean,
)
run_clean(cfg.dataset, year, cfg.root, cfg.clean, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output)
run_mart(
cfg.dataset,
year,
cfg.root,
cfg.mart,
logger,
base_dir=cfg.base_dir,
clean_cfg=cfg.clean,
output_cfg=cfg.output,
)

root = Path(cfg.root)
clean_parquet = root / "data" / "clean" / cfg.dataset / str(year) / f"{cfg.dataset}_{year}_clean.parquet"
Expand Down
191 changes: 191 additions & 0 deletions tests/test_run_dry_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,194 @@ def test_run_year_logs_effective_root_context(tmp_path: Path, caplog) -> None:
assert f"base_dir={tmp_path}" in caplog.text
assert f"effective_root={root_dir}" in caplog.text
assert "root_source=yml" in caplog.text


def test_run_dry_run_accepts_mart_only_config(tmp_path: Path) -> None:
mart_sql = tmp_path / "compose" / "sql"
mart_sql.mkdir(parents=True, exist_ok=True)
source_path = tmp_path / "external_source.parquet"

con = duckdb.connect()
con.execute("COPY (SELECT 1 AS value) TO ? (FORMAT PARQUET)", [str(source_path)])
con.close()

(mart_sql / "mart_example.sql").write_text(
f"select * from read_parquet('{source_path.as_posix()}')",
encoding="utf-8",
)

config_path = tmp_path / "compose" / "dataset.yml"
root_dir = tmp_path / "out"
config_path.write_text(
"\n".join(
[
f'root: "{root_dir.as_posix()}"',
"dataset:",
' name: "compose_demo"',
" years: [2022]",
"raw: {}",
"mart:",
" tables:",
' - name: "mart_example"',
' sql: "sql/mart_example.sql"',
]
),
encoding="utf-8",
)

runner = CliRunner()
result = runner.invoke(app, ["run", "mart", "--config", str(config_path), "--dry-run"])

assert result.exit_code == 0
assert "Execution Plan" in result.output
assert "steps: mart" in result.output
assert "sql_validation: OK" in result.output


def test_run_dry_run_all_fails_readably_on_mart_only_config(tmp_path: Path) -> None:
mart_sql = tmp_path / "compose" / "sql"
mart_sql.mkdir(parents=True, exist_ok=True)
(mart_sql / "mart_example.sql").write_text("select 1 as value", encoding="utf-8")

config_path = tmp_path / "compose" / "dataset.yml"
root_dir = tmp_path / "out"
config_path.write_text(
"\n".join(
[
f'root: "{root_dir.as_posix()}"',
"dataset:",
' name: "compose_demo"',
" years: [2022]",
"raw: {}",
"mart:",
" tables:",
' - name: "mart_example"',
' sql: "sql/mart_example.sql"',
]
),
encoding="utf-8",
)

runner = CliRunner()
result = runner.invoke(app, ["run", "all", "--config", str(config_path), "--dry-run"])

assert result.exit_code != 0
assert "run all is not supported for mart-only / compose-only configs" in str(result.exception)


def test_run_mart_executes_mart_only_config(tmp_path: Path) -> None:
mart_sql = tmp_path / "compose" / "sql"
mart_sql.mkdir(parents=True, exist_ok=True)
source_path = tmp_path / "external_source.parquet"

con = duckdb.connect()
con.execute("COPY (SELECT 1 AS value) TO ? (FORMAT PARQUET)", [str(source_path)])
con.close()

(mart_sql / "mart_example.sql").write_text(
f"select * from read_parquet('{source_path.as_posix()}')",
encoding="utf-8",
)

config_path = tmp_path / "compose" / "dataset.yml"
root_dir = tmp_path / "out"
config_path.write_text(
"\n".join(
[
f'root: "{root_dir.as_posix()}"',
"dataset:",
' name: "compose_demo"',
" years: [2022]",
"raw: {}",
"mart:",
" tables:",
' - name: "mart_example"',
' sql: "sql/mart_example.sql"',
]
),
encoding="utf-8",
)

runner = CliRunner()
result = runner.invoke(app, ["run", "mart", "--config", str(config_path)])

assert result.exit_code == 0
mart_dir = root_dir / "data" / "mart" / "compose_demo" / "2022"
assert (mart_dir / "mart_example.parquet").exists()
assert (mart_dir / "metadata.json").exists()
assert not (root_dir / "data" / "clean" / "compose_demo" / "2022").exists()


def test_run_mart_mart_only_ignores_stale_clean_dir(tmp_path: Path) -> None:
mart_sql = tmp_path / "compose" / "sql"
mart_sql.mkdir(parents=True, exist_ok=True)

config_path = tmp_path / "compose" / "dataset.yml"
root_dir = tmp_path / "out"
stale_clean_dir = root_dir / "data" / "clean" / "compose_demo" / "2022"
stale_clean_dir.mkdir(parents=True, exist_ok=True)
stale_clean_path = stale_clean_dir / "compose_demo_2022_clean.parquet"
duckdb.execute(
f"COPY (SELECT 1 AS stale_value) TO '{stale_clean_path.as_posix()}' (FORMAT PARQUET)"
)

(mart_sql / "mart_example.sql").write_text(
"select stale_value from clean_input",
encoding="utf-8",
)

config_path.write_text(
"\n".join(
[
f'root: "{root_dir.as_posix()}"',
"dataset:",
' name: "compose_demo"',
" years: [2022]",
"raw: {}",
"mart:",
" tables:",
' - name: "mart_example"',
' sql: "sql/mart_example.sql"',
]
),
encoding="utf-8",
)

runner = CliRunner()
result = runner.invoke(app, ["run", "mart", "--config", str(config_path)])

assert result.exit_code != 0
assert "clean_input" in str(result.exception)
mart_output = root_dir / "data" / "mart" / "compose_demo" / "2022" / "mart_example.parquet"
assert not mart_output.exists()


def test_run_all_fails_readably_on_mart_only_config(tmp_path: Path) -> None:
mart_sql = tmp_path / "compose" / "sql"
mart_sql.mkdir(parents=True, exist_ok=True)
(mart_sql / "mart_example.sql").write_text("select 1 as value", encoding="utf-8")

config_path = tmp_path / "compose" / "dataset.yml"
root_dir = tmp_path / "out"
config_path.write_text(
"\n".join(
[
f'root: "{root_dir.as_posix()}"',
"dataset:",
' name: "compose_demo"',
" years: [2022]",
"raw: {}",
"mart:",
" tables:",
' - name: "mart_example"',
' sql: "sql/mart_example.sql"',
]
),
encoding="utf-8",
)

runner = CliRunner()
result = runner.invoke(app, ["run", "all", "--config", str(config_path)])

assert result.exit_code != 0
assert "run all is not supported for mart-only / compose-only configs" in str(result.exception)
16 changes: 16 additions & 0 deletions toolkit/cli/cmd_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,25 @@ def _resolve_sql_path(cfg, rel_path: str | None) -> Path:
return Path(cfg.base_dir) / path


def _is_mart_only_cfg(cfg) -> bool:
return not bool(cfg.clean.get("sql"))


def _validate_execution_plan(cfg, step: str) -> list[str]:
layers = _planned_layers(step)

if step == "all" and _is_mart_only_cfg(cfg):
raise ValueError(
"run all is not supported for mart-only / compose-only configs; "
"use: toolkit run mart --config ...",
)

if "clean" in layers:
if _is_mart_only_cfg(cfg):
raise ValueError(
"run clean is not supported for mart-only / compose-only configs; "
"use: toolkit run mart --config ...",
)
clean_sql = _resolve_sql_path(cfg, cfg.clean.get("sql"))
if not clean_sql.exists():
raise FileNotFoundError(f"CLEAN SQL file not found: {clean_sql}")
Expand Down Expand Up @@ -270,6 +285,7 @@ def _execute_layer(layer_name: str, target, *args, **kwargs) -> None:
cfg.root,
cfg.mart,
base_dir=cfg.base_dir,
clean_cfg=cfg.clean,
output_cfg=cfg.output,
)

Expand Down
8 changes: 5 additions & 3 deletions toolkit/cli/sql_dry_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ def _build_clean_preview(


def _validate_mart_sql(cfg, *, year: int, con: duckdb.DuckDBPyConnection) -> None:
con.execute("CREATE OR REPLACE VIEW clean_input AS SELECT * FROM __dry_run_clean_preview")
con.execute("CREATE OR REPLACE VIEW clean AS SELECT * FROM clean_input")
if cfg.clean.get("sql"):
con.execute("CREATE OR REPLACE VIEW clean_input AS SELECT * FROM __dry_run_clean_preview")
con.execute("CREATE OR REPLACE VIEW clean AS SELECT * FROM clean_input")

tables = cfg.mart.get("tables") or []
template_ctx = build_runtime_template_ctx(
Expand Down Expand Up @@ -139,7 +140,8 @@ def validate_sql_dry_run(cfg, *, year: int, layers: list[str]) -> None:

con = duckdb.connect(":memory:")
try:
_build_clean_preview(cfg, year=year, con=con)
if cfg.clean.get("sql"):
_build_clean_preview(cfg, year=year, con=con)
if "mart" in layers:
_validate_mart_sql(cfg, year=year, con=con)
finally:
Expand Down
36 changes: 21 additions & 15 deletions toolkit/mart/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def run_mart(
logger,
*,
base_dir: Path | None = None,
clean_cfg: dict[str, Any] | None = None,
output_cfg: dict[str, Any] | None = None,
):
policy = resolve_artifact_policy(output_cfg)
Expand All @@ -44,24 +45,29 @@ def run_mart(
mart_dir = layer_year_dir(root, "mart", dataset, year)
mart_dir.mkdir(parents=True, exist_ok=True)

if not clean_dir.exists():
raise FileNotFoundError(f"CLEAN dir not found: {clean_dir}. Run: toolkit run clean -c dataset.yml")

clean_files = list(clean_dir.glob("*.parquet"))
if not clean_files:
raise FileNotFoundError(f"No CLEAN parquet found in {clean_dir}")
clean_sql_configured = bool((clean_cfg or {}).get("sql"))
clean_files: list[Path] = []
if clean_sql_configured:
if not clean_dir.exists():
raise FileNotFoundError(
f"CLEAN dir not found: {clean_dir}. Run: toolkit run clean -c dataset.yml"
)
clean_files = list(clean_dir.glob("*.parquet"))
if not clean_files:
raise FileNotFoundError(f"No CLEAN parquet found in {clean_dir}")

con = duckdb.connect(":memory:")
try:
# clean_input view
if len(clean_files) == 1:
con.execute(f"CREATE VIEW clean_input AS SELECT * FROM read_parquet('{clean_files[0]}')")
else:
paths = ",".join([f"'{p}'" for p in clean_files])
con.execute(f"CREATE VIEW clean_input AS SELECT * FROM read_parquet([{paths}])")

# alias for backward-compatible SQL (old templates may reference "clean")
con.execute("CREATE OR REPLACE VIEW clean AS SELECT * FROM clean_input")
if clean_files:
# clean_input view
if len(clean_files) == 1:
con.execute(f"CREATE VIEW clean_input AS SELECT * FROM read_parquet('{clean_files[0]}')")
else:
paths = ",".join([f"'{p}'" for p in clean_files])
con.execute(f"CREATE VIEW clean_input AS SELECT * FROM read_parquet([{paths}])")

# alias for backward-compatible SQL (old templates may reference "clean")
con.execute("CREATE OR REPLACE VIEW clean AS SELECT * FROM clean_input")

tables = mart_cfg.get("tables") or []
if not isinstance(tables, list) or not tables:
Expand Down
Loading
Loading