diff --git a/tests/test_artifacts_policy.py b/tests/test_artifacts_policy.py index e06eb87..c0642db 100644 --- a/tests/test_artifacts_policy.py +++ b/tests/test_artifacts_policy.py @@ -63,7 +63,16 @@ def test_artifacts_policy_minimal_skips_optional_outputs(tmp_path: Path, monkeyp ) profile_cmd(step="raw", config=str(config_path)) run_clean(cfg.dataset, year, cfg.root, cfg.clean, logger, base_dir=cfg.base_dir, output_cfg=cfg.output) - run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output) + run_mart( + cfg.dataset, + year, + cfg.root, + cfg.mart, + logger, + base_dir=cfg.base_dir, + clean_cfg=cfg.clean, + output_cfg=cfg.output, + ) run_clean_validation(cfg, year, logger) run_mart_validation(cfg, year, logger) @@ -119,7 +128,16 @@ def test_artifacts_policy_standard_keeps_current_debug_artifacts(tmp_path: Path, ) profile_cmd(step="raw", config=str(config_path)) run_clean(cfg.dataset, year, cfg.root, cfg.clean, logger, base_dir=cfg.base_dir, output_cfg=cfg.output) - run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output) + run_mart( + cfg.dataset, + year, + cfg.root, + cfg.mart, + logger, + base_dir=cfg.base_dir, + clean_cfg=cfg.clean, + output_cfg=cfg.output, + ) run_clean_validation(cfg, year, logger) run_mart_validation(cfg, year, logger) @@ -183,7 +201,16 @@ def test_run_mart_supports_root_posix_placeholder(tmp_path: Path) -> None: cfg = load_config(config_path) logger = _NoopLogger() - result = run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output) + result = run_mart( + cfg.dataset, + year, + cfg.root, + cfg.mart, + logger, + base_dir=cfg.base_dir, + clean_cfg=cfg.clean, + output_cfg=cfg.output, + ) mart_output = root_dir / "data" / "mart" / dataset / str(year) / "mart_example.parquet" assert mart_output.exists() diff --git a/tests/test_project_example_e2e.py b/tests/test_project_example_e2e.py index 43ec332..e77d75d 100644 --- a/tests/test_project_example_e2e.py +++ b/tests/test_project_example_e2e.py @@ -58,7 +58,16 @@ def test_project_example_golden_path(tmp_path: Path, monkeypatch): clean_cfg=cfg.clean, ) run_clean(cfg.dataset, year, cfg.root, cfg.clean, logger, base_dir=cfg.base_dir, output_cfg=cfg.output) - run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output) + run_mart( + cfg.dataset, + year, + cfg.root, + cfg.mart, + logger, + base_dir=cfg.base_dir, + clean_cfg=cfg.clean, + output_cfg=cfg.output, + ) validate_cmd(step="clean", config=str(dst / "dataset.yml")) validate_cmd(step="mart", config=str(dst / "dataset.yml")) @@ -200,7 +209,16 @@ def test_project_example_outputs_can_be_replaced_after_run(tmp_path: Path, monke clean_cfg=cfg.clean, ) run_clean(cfg.dataset, year, cfg.root, cfg.clean, logger, base_dir=cfg.base_dir, output_cfg=cfg.output) - run_mart(cfg.dataset, year, cfg.root, cfg.mart, logger, base_dir=cfg.base_dir, output_cfg=cfg.output) + run_mart( + cfg.dataset, + year, + cfg.root, + cfg.mart, + logger, + base_dir=cfg.base_dir, + clean_cfg=cfg.clean, + output_cfg=cfg.output, + ) root = Path(cfg.root) clean_parquet = root / "data" / "clean" / cfg.dataset / str(year) / f"{cfg.dataset}_{year}_clean.parquet" diff --git a/tests/test_run_dry_run.py b/tests/test_run_dry_run.py index 9e2ec8c..d6ed8da 100644 --- a/tests/test_run_dry_run.py +++ b/tests/test_run_dry_run.py @@ -249,3 +249,194 @@ def test_run_year_logs_effective_root_context(tmp_path: Path, caplog) -> None: assert f"base_dir={tmp_path}" in caplog.text assert f"effective_root={root_dir}" in caplog.text assert "root_source=yml" in caplog.text + + +def test_run_dry_run_accepts_mart_only_config(tmp_path: Path) -> None: + mart_sql = tmp_path / "compose" / "sql" + mart_sql.mkdir(parents=True, exist_ok=True) + source_path = tmp_path / "external_source.parquet" + + con = duckdb.connect() + con.execute("COPY (SELECT 1 AS value) TO ? (FORMAT PARQUET)", [str(source_path)]) + con.close() + + (mart_sql / "mart_example.sql").write_text( + f"select * from read_parquet('{source_path.as_posix()}')", + encoding="utf-8", + ) + + config_path = tmp_path / "compose" / "dataset.yml" + root_dir = tmp_path / "out" + config_path.write_text( + "\n".join( + [ + f'root: "{root_dir.as_posix()}"', + "dataset:", + ' name: "compose_demo"', + " years: [2022]", + "raw: {}", + "mart:", + " tables:", + ' - name: "mart_example"', + ' sql: "sql/mart_example.sql"', + ] + ), + encoding="utf-8", + ) + + runner = CliRunner() + result = runner.invoke(app, ["run", "mart", "--config", str(config_path), "--dry-run"]) + + assert result.exit_code == 0 + assert "Execution Plan" in result.output + assert "steps: mart" in result.output + assert "sql_validation: OK" in result.output + + +def test_run_dry_run_all_fails_readably_on_mart_only_config(tmp_path: Path) -> None: + mart_sql = tmp_path / "compose" / "sql" + mart_sql.mkdir(parents=True, exist_ok=True) + (mart_sql / "mart_example.sql").write_text("select 1 as value", encoding="utf-8") + + config_path = tmp_path / "compose" / "dataset.yml" + root_dir = tmp_path / "out" + config_path.write_text( + "\n".join( + [ + f'root: "{root_dir.as_posix()}"', + "dataset:", + ' name: "compose_demo"', + " years: [2022]", + "raw: {}", + "mart:", + " tables:", + ' - name: "mart_example"', + ' sql: "sql/mart_example.sql"', + ] + ), + encoding="utf-8", + ) + + runner = CliRunner() + result = runner.invoke(app, ["run", "all", "--config", str(config_path), "--dry-run"]) + + assert result.exit_code != 0 + assert "run all is not supported for mart-only / compose-only configs" in str(result.exception) + + +def test_run_mart_executes_mart_only_config(tmp_path: Path) -> None: + mart_sql = tmp_path / "compose" / "sql" + mart_sql.mkdir(parents=True, exist_ok=True) + source_path = tmp_path / "external_source.parquet" + + con = duckdb.connect() + con.execute("COPY (SELECT 1 AS value) TO ? (FORMAT PARQUET)", [str(source_path)]) + con.close() + + (mart_sql / "mart_example.sql").write_text( + f"select * from read_parquet('{source_path.as_posix()}')", + encoding="utf-8", + ) + + config_path = tmp_path / "compose" / "dataset.yml" + root_dir = tmp_path / "out" + config_path.write_text( + "\n".join( + [ + f'root: "{root_dir.as_posix()}"', + "dataset:", + ' name: "compose_demo"', + " years: [2022]", + "raw: {}", + "mart:", + " tables:", + ' - name: "mart_example"', + ' sql: "sql/mart_example.sql"', + ] + ), + encoding="utf-8", + ) + + runner = CliRunner() + result = runner.invoke(app, ["run", "mart", "--config", str(config_path)]) + + assert result.exit_code == 0 + mart_dir = root_dir / "data" / "mart" / "compose_demo" / "2022" + assert (mart_dir / "mart_example.parquet").exists() + assert (mart_dir / "metadata.json").exists() + assert not (root_dir / "data" / "clean" / "compose_demo" / "2022").exists() + + +def test_run_mart_mart_only_ignores_stale_clean_dir(tmp_path: Path) -> None: + mart_sql = tmp_path / "compose" / "sql" + mart_sql.mkdir(parents=True, exist_ok=True) + + config_path = tmp_path / "compose" / "dataset.yml" + root_dir = tmp_path / "out" + stale_clean_dir = root_dir / "data" / "clean" / "compose_demo" / "2022" + stale_clean_dir.mkdir(parents=True, exist_ok=True) + stale_clean_path = stale_clean_dir / "compose_demo_2022_clean.parquet" + duckdb.execute( + f"COPY (SELECT 1 AS stale_value) TO '{stale_clean_path.as_posix()}' (FORMAT PARQUET)" + ) + + (mart_sql / "mart_example.sql").write_text( + "select stale_value from clean_input", + encoding="utf-8", + ) + + config_path.write_text( + "\n".join( + [ + f'root: "{root_dir.as_posix()}"', + "dataset:", + ' name: "compose_demo"', + " years: [2022]", + "raw: {}", + "mart:", + " tables:", + ' - name: "mart_example"', + ' sql: "sql/mart_example.sql"', + ] + ), + encoding="utf-8", + ) + + runner = CliRunner() + result = runner.invoke(app, ["run", "mart", "--config", str(config_path)]) + + assert result.exit_code != 0 + assert "clean_input" in str(result.exception) + mart_output = root_dir / "data" / "mart" / "compose_demo" / "2022" / "mart_example.parquet" + assert not mart_output.exists() + + +def test_run_all_fails_readably_on_mart_only_config(tmp_path: Path) -> None: + mart_sql = tmp_path / "compose" / "sql" + mart_sql.mkdir(parents=True, exist_ok=True) + (mart_sql / "mart_example.sql").write_text("select 1 as value", encoding="utf-8") + + config_path = tmp_path / "compose" / "dataset.yml" + root_dir = tmp_path / "out" + config_path.write_text( + "\n".join( + [ + f'root: "{root_dir.as_posix()}"', + "dataset:", + ' name: "compose_demo"', + " years: [2022]", + "raw: {}", + "mart:", + " tables:", + ' - name: "mart_example"', + ' sql: "sql/mart_example.sql"', + ] + ), + encoding="utf-8", + ) + + runner = CliRunner() + result = runner.invoke(app, ["run", "all", "--config", str(config_path)]) + + assert result.exit_code != 0 + assert "run all is not supported for mart-only / compose-only configs" in str(result.exception) diff --git a/toolkit/cli/cmd_run.py b/toolkit/cli/cmd_run.py index 9255996..4141679 100644 --- a/toolkit/cli/cmd_run.py +++ b/toolkit/cli/cmd_run.py @@ -50,10 +50,25 @@ def _resolve_sql_path(cfg, rel_path: str | None) -> Path: return Path(cfg.base_dir) / path +def _is_mart_only_cfg(cfg) -> bool: + return not bool(cfg.clean.get("sql")) + + def _validate_execution_plan(cfg, step: str) -> list[str]: layers = _planned_layers(step) + if step == "all" and _is_mart_only_cfg(cfg): + raise ValueError( + "run all is not supported for mart-only / compose-only configs; " + "use: toolkit run mart --config ...", + ) + if "clean" in layers: + if _is_mart_only_cfg(cfg): + raise ValueError( + "run clean is not supported for mart-only / compose-only configs; " + "use: toolkit run mart --config ...", + ) clean_sql = _resolve_sql_path(cfg, cfg.clean.get("sql")) if not clean_sql.exists(): raise FileNotFoundError(f"CLEAN SQL file not found: {clean_sql}") @@ -270,6 +285,7 @@ def _execute_layer(layer_name: str, target, *args, **kwargs) -> None: cfg.root, cfg.mart, base_dir=cfg.base_dir, + clean_cfg=cfg.clean, output_cfg=cfg.output, ) diff --git a/toolkit/cli/sql_dry_run.py b/toolkit/cli/sql_dry_run.py index 400614e..a527b00 100644 --- a/toolkit/cli/sql_dry_run.py +++ b/toolkit/cli/sql_dry_run.py @@ -109,8 +109,9 @@ def _build_clean_preview( def _validate_mart_sql(cfg, *, year: int, con: duckdb.DuckDBPyConnection) -> None: - con.execute("CREATE OR REPLACE VIEW clean_input AS SELECT * FROM __dry_run_clean_preview") - con.execute("CREATE OR REPLACE VIEW clean AS SELECT * FROM clean_input") + if cfg.clean.get("sql"): + con.execute("CREATE OR REPLACE VIEW clean_input AS SELECT * FROM __dry_run_clean_preview") + con.execute("CREATE OR REPLACE VIEW clean AS SELECT * FROM clean_input") tables = cfg.mart.get("tables") or [] template_ctx = build_runtime_template_ctx( @@ -139,7 +140,8 @@ def validate_sql_dry_run(cfg, *, year: int, layers: list[str]) -> None: con = duckdb.connect(":memory:") try: - _build_clean_preview(cfg, year=year, con=con) + if cfg.clean.get("sql"): + _build_clean_preview(cfg, year=year, con=con) if "mart" in layers: _validate_mart_sql(cfg, year=year, con=con) finally: diff --git a/toolkit/mart/run.py b/toolkit/mart/run.py index c557a74..4471a27 100644 --- a/toolkit/mart/run.py +++ b/toolkit/mart/run.py @@ -36,6 +36,7 @@ def run_mart( logger, *, base_dir: Path | None = None, + clean_cfg: dict[str, Any] | None = None, output_cfg: dict[str, Any] | None = None, ): policy = resolve_artifact_policy(output_cfg) @@ -44,24 +45,29 @@ def run_mart( mart_dir = layer_year_dir(root, "mart", dataset, year) mart_dir.mkdir(parents=True, exist_ok=True) - if not clean_dir.exists(): - raise FileNotFoundError(f"CLEAN dir not found: {clean_dir}. Run: toolkit run clean -c dataset.yml") - - clean_files = list(clean_dir.glob("*.parquet")) - if not clean_files: - raise FileNotFoundError(f"No CLEAN parquet found in {clean_dir}") + clean_sql_configured = bool((clean_cfg or {}).get("sql")) + clean_files: list[Path] = [] + if clean_sql_configured: + if not clean_dir.exists(): + raise FileNotFoundError( + f"CLEAN dir not found: {clean_dir}. Run: toolkit run clean -c dataset.yml" + ) + clean_files = list(clean_dir.glob("*.parquet")) + if not clean_files: + raise FileNotFoundError(f"No CLEAN parquet found in {clean_dir}") con = duckdb.connect(":memory:") try: - # clean_input view - if len(clean_files) == 1: - con.execute(f"CREATE VIEW clean_input AS SELECT * FROM read_parquet('{clean_files[0]}')") - else: - paths = ",".join([f"'{p}'" for p in clean_files]) - con.execute(f"CREATE VIEW clean_input AS SELECT * FROM read_parquet([{paths}])") - - # alias for backward-compatible SQL (old templates may reference "clean") - con.execute("CREATE OR REPLACE VIEW clean AS SELECT * FROM clean_input") + if clean_files: + # clean_input view + if len(clean_files) == 1: + con.execute(f"CREATE VIEW clean_input AS SELECT * FROM read_parquet('{clean_files[0]}')") + else: + paths = ",".join([f"'{p}'" for p in clean_files]) + con.execute(f"CREATE VIEW clean_input AS SELECT * FROM read_parquet([{paths}])") + + # alias for backward-compatible SQL (old templates may reference "clean") + con.execute("CREATE OR REPLACE VIEW clean AS SELECT * FROM clean_input") tables = mart_cfg.get("tables") or [] if not isinstance(tables, list) or not tables: diff --git a/toolkit/mart/validate.py b/toolkit/mart/validate.py index d2d3752..974bb75 100644 --- a/toolkit/mart/validate.py +++ b/toolkit/mart/validate.py @@ -226,7 +226,7 @@ def run_mart_validation(cfg, year: int, logger) -> dict[str, Any]: spec = MartValidationSpec.model_validate( { "required_tables": mart_cfg.get("required_tables"), - "validate": mart_cfg.get("validate"), + "validate": mart_cfg.get("validate") or {}, } )