Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,117 @@ def test_load_config_uses_base_dir_when_root_missing_and_dcl_root_missing(tmp_pa
assert cfg.root_source == "base_dir_fallback"


@pytest.mark.parametrize(
("dataset_rel", "root_value"),
[
("candidates/demo_dataset", "../../out"),
("candidates/demo_dataset/sources/demo_source", "../../../../out"),
("support_datasets/demo_support", "../../out"),
],
)
def test_load_config_resolves_repo_out_for_dataset_incubator_layouts(
tmp_path: Path,
dataset_rel: str,
root_value: str,
):
repo_root = tmp_path / "dataset-incubator"
dataset_dir = repo_root / Path(dataset_rel)
dataset_dir.mkdir(parents=True, exist_ok=True)
yml = dataset_dir / "dataset.yml"
yml.write_text(
f"""
root: "{root_value}"
dataset:
name: demo
years: [2022]
raw: {{}}
clean: {{}}
mart: {{}}
""".strip(),
encoding="utf-8",
)

cfg = load_config(yml, repo_root=repo_root)

assert cfg.root == (repo_root / "out").resolve()
assert cfg.root_source == "yml"


def test_load_config_accepts_absolute_root_within_repo_when_repo_root_is_provided(tmp_path: Path):
repo_root = tmp_path / "dataset-incubator"
dataset_dir = repo_root / "candidates" / "demo_dataset"
dataset_dir.mkdir(parents=True, exist_ok=True)
allowed_root = (repo_root / "out").resolve()
yml = dataset_dir / "dataset.yml"
yml.write_text(
f"""
root: "{allowed_root.as_posix()}"
dataset:
name: demo
years: [2022]
raw: {{}}
clean: {{}}
mart: {{}}
""".strip(),
encoding="utf-8",
)

cfg = load_config(yml, repo_root=repo_root)

assert cfg.root == allowed_root
assert cfg.root_source == "yml"


def test_load_config_rejects_root_outside_repo_when_repo_root_is_provided(tmp_path: Path):
repo_root = tmp_path / "dataset-incubator"
dataset_dir = repo_root / "candidates" / "demo_dataset"
dataset_dir.mkdir(parents=True, exist_ok=True)
outside_root = tmp_path / "outside"
yml = dataset_dir / "dataset.yml"
yml.write_text(
f"""
root: "{outside_root.as_posix()}"
dataset:
name: demo
years: [2022]
raw: {{}}
clean: {{}}
mart: {{}}
""".strip(),
encoding="utf-8",
)

with pytest.raises(ValueError) as exc:
load_config(yml, repo_root=repo_root)

assert "root resolves outside repo_root" in str(exc.value)
assert str(outside_root.resolve()) in str(exc.value)
assert str(repo_root.resolve()) in str(exc.value)


def test_load_config_allows_root_outside_repo_without_repo_root_guard(tmp_path: Path):
repo_root = tmp_path / "dataset-incubator"
dataset_dir = repo_root / "candidates" / "demo_dataset"
dataset_dir.mkdir(parents=True, exist_ok=True)
yml = dataset_dir / "dataset.yml"
yml.write_text(
"""
root: "../../../outside"
dataset:
name: demo
years: [2022]
raw: {}
clean: {}
mart: {}
""".strip(),
encoding="utf-8",
)

cfg = load_config(yml)

assert cfg.root == (tmp_path / "outside").resolve()


def test_load_config_rejects_legacy_clean_read_csv_shape(tmp_path: Path):
project_dir = tmp_path / "project"
project_dir.mkdir()
Expand Down
9 changes: 7 additions & 2 deletions toolkit/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,13 @@ def _compat_cross_year(model: ToolkitConfigModel) -> dict[str, Any]:
)


def load_config(path: str | Path, *, strict_config: bool = False) -> ToolkitConfig:
model = load_config_model(path, strict_config=strict_config)
def load_config(
path: str | Path,
*,
strict_config: bool = False,
repo_root: str | Path | None = None,
) -> ToolkitConfig:
model = load_config_model(path, strict_config=strict_config, repo_root=repo_root)
return ToolkitConfig(
base_dir=model.base_dir,
schema_version=model.schema_version,
Expand Down
45 changes: 44 additions & 1 deletion toolkit/core/config_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,26 @@ def _resolve_root(root: Any, *, base_dir: Path) -> tuple[Path, str]:
source = "env:TOOLKIT_OUTDIR" if os.environ.get("TOOLKIT_OUTDIR") else "env:DCL_OUTDIR"
return Path(managed_outdir).expanduser().resolve(), source
return _resolve_path_value(root, base_dir=base_dir), "yml"


def _ensure_root_within_repo(root: Path, *, repo_root: Path, path: Path) -> Path:
"""
Verify that `root` is contained within `repo_root` using resolved paths.
Both `root` and `repo_root` must already be fully resolved by the caller.
Returns `root` unchanged on success; raises ValueError on violation.
`path` is the config file path, used only for error context.
Note: this guard checks only the output root directory, not SQL input paths.
"""
try:
root.relative_to(repo_root)
except ValueError as exc:
raise _err(
f"root resolves outside repo_root: root={root} repo_root={repo_root}",
path=path,
) from exc
return root


def _emit_deprecation_notice(
key: str,
*,
Expand Down Expand Up @@ -730,7 +750,22 @@ def _read_strict_config(data: dict[str, Any], *, path: Path) -> bool:
return parse_bool(strict_value, "config.strict")


def load_config_model(path: str | Path, *, strict_config: bool = False) -> ToolkitConfigModel:
def load_config_model(
path: str | Path,
*,
strict_config: bool = False,
repo_root: str | Path | None = None,
) -> ToolkitConfigModel:
"""
Load and normalize toolkit config.

repo_root is an optional guardrail for callers that need to enforce that
the resolved effective root stays inside a known repository tree. This is
intentionally opt-in so the toolkit can still support valid workflows that
write outputs outside the project directory. A typical caller is external
CI that validates dataset.yml contracts for monorepos such as
dataset-incubator.
"""
p = Path(path)
base_dir = p.parent.resolve()

Expand All @@ -752,6 +787,14 @@ def load_config_model(path: str | Path, *, strict_config: bool = False) -> Toolk
normalized = _normalize_legacy_payload(data, path=p, strict_config=strict_mode)
normalized = _warn_or_reject_unknown_keys(normalized, path=p, strict_config=strict_mode)
root_path, root_source = _resolve_root(normalized.get("root"), base_dir=base_dir)
if repo_root is not None:
repo_root_path = Path(repo_root).expanduser().resolve()
if not repo_root_path.is_dir():
raise _err(
f"repo_root does not exist or is not a directory: {repo_root_path}",
path=p,
)
root_path = _ensure_root_within_repo(root_path, repo_root=repo_root_path, path=p)

raw = normalized.get("raw", {}) or {}
clean = normalized.get("clean", {}) or {}
Expand Down
Loading