Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/pystatsv1/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,30 @@ def cmd_doctor(args: argparse.Namespace) -> int:
return 1


def cmd_trackd_validate(args: argparse.Namespace) -> int:
# Keep CLI wiring lightweight: validation logic lives in pystatsv1.trackd.validate.
from pystatsv1.trackd import TrackDDataError, TrackDSchemaError
from pystatsv1.trackd.validate import validate_dataset

try:
validate_dataset(args.datadir, profile=args.profile)
except (TrackDDataError, TrackDSchemaError) as e:
print(str(e))
return 1

print(
textwrap.dedent(
f"""\
✅ Track D dataset looks valid.

Profile: {args.profile}
Data directory: {Path(args.datadir).expanduser()}
"""
).rstrip()
)
return 0


def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="pystatsv1",
Expand Down Expand Up @@ -473,6 +497,26 @@ def build_parser() -> argparse.ArgumentParser:
)
p_check.set_defaults(func=cmd_workbook_check)

p_trackd = sub.add_parser("trackd", help="Track D helpers (business datasets).")
td_sub = p_trackd.add_subparsers(dest="trackd_cmd", required=True)

p_td_validate = td_sub.add_parser(
"validate",
help="Validate a Track D dataset folder against a profile (BYOD foundations).",
)
p_td_validate.add_argument(
"--datadir",
required=True,
help="Path to the folder containing exported Track D CSV tables.",
)
p_td_validate.add_argument(
"--profile",
default="full",
choices=["core_gl", "ar", "full"],
help="Which profile to validate (default: full).",
)
p_td_validate.set_defaults(func=cmd_trackd_validate)


return p

Expand Down
53 changes: 53 additions & 0 deletions src/pystatsv1/trackd/contracts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# SPDX-License-Identifier: MIT
"""Dataset profile contracts for Track D.

Phase 2 (BYOD foundations) introduces *profiles* that let users validate smaller
subsets of the full Track D dataset contract.

Profiles are defined as ordered subsets of the existing Track D contract tables
in :mod:`pystatsv1.trackd.schema`.
"""

from __future__ import annotations

from typing import Final

from .schema import CONTRACT_TABLES, NSO_V1_TABLE_ORDER, TableSchema


# Ordered subsets of the existing Track D contract.
#
# - core_gl: the smallest General Ledger "starter" export (Chart of Accounts + GL)
# - ar: adds accounts receivable events
# - full: all Track D tables in canonical workbook order
PROFILE_TABLE_KEYS: Final[dict[str, tuple[str, ...]]] = {
"core_gl": (
"chart_of_accounts",
"gl_journal",
),
"ar": (
"chart_of_accounts",
"gl_journal",
"ar_events",
),
"full": NSO_V1_TABLE_ORDER,
}


ALLOWED_PROFILES: Final[tuple[str, ...]] = tuple(PROFILE_TABLE_KEYS.keys())


def schemas_for_profile(profile: str) -> tuple[TableSchema, ...]:
"""Return the ordered :class:`~pystatsv1.trackd.schema.TableSchema` tuple for a profile."""

p = (profile or "").strip().lower()
keys = PROFILE_TABLE_KEYS.get(p)
if keys is None:
raise ValueError(f"Unknown profile: {profile}. Use one of: {', '.join(ALLOWED_PROFILES)}")

# Defensive: ensure profile only references known contract tables.
missing = [k for k in keys if k not in CONTRACT_TABLES]
if missing:
raise ValueError(f"Profile '{p}' references unknown contract tables: {', '.join(missing)}")

return tuple(CONTRACT_TABLES[k] for k in keys)
133 changes: 133 additions & 0 deletions src/pystatsv1/trackd/validate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# SPDX-License-Identifier: MIT
"""Validation helpers for Track D dataset folders.

Unlike :func:`pystatsv1.trackd.loaders.load_table`, which is intentionally
fail-fast, the functions here aggregate all schema issues into one friendly
summary that is suitable for student-facing CLI output.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any

import pandas as pd

from ._errors import TrackDDataError, TrackDSchemaError
from ._types import PathLike
from .contracts import ALLOWED_PROFILES, schemas_for_profile
from .loaders import resolve_datadir


def _read_header(path: Path) -> list[str]:
# Cheap header-only read for required-column validation.
df = pd.read_csv(path, nrows=0)
return [str(c) for c in df.columns]


def _format_report(report: dict[str, Any]) -> str:
profile = str(report.get("profile", ""))
datadir = str(report.get("datadir", ""))

missing_files: list[str] = list(report.get("missing_files", []))
missing_cols: dict[str, list[str]] = dict(report.get("missing_columns", {}))
found_cols: dict[str, list[str]] = dict(report.get("found_columns", {}))

lines: list[str] = [
"Track D dataset validation failed.",
f"Profile: {profile}",
f"Data directory: {datadir}",
"",
]

if missing_files:
lines += ["Missing CSV files:", *[f" - {n}" for n in missing_files], ""]

if missing_cols:
lines.append("CSV files with missing required columns:")
for name in sorted(missing_cols.keys()):
cols = missing_cols[name]
lines.append(f" - {name}: missing {', '.join(cols)}")
found = found_cols.get(name)
if found:
lines.append(f" Found columns: {', '.join(found)}")
lines.append("")

lines += [
"Fix: export the required CSV(s) and ensure the header names match the Track D contract.",
"Tip: compare your exported CSV headers against the workbook downloads.",
]
return "\n".join(lines)


def validate_dataset(datadir: PathLike | None, *, profile: str = "full") -> dict[str, Any]:
"""Validate a Track D dataset folder against a profile.

Parameters
----------
datadir:
Folder containing CSV inputs.
profile:
One of: core_gl, ar, full.

Returns
-------
dict
A report dict with keys:
ok, profile, datadir, missing_files, missing_columns, found_columns.

Raises
------
TrackDDataError
If *datadir* is missing or not a directory.
TrackDSchemaError
If required files or required columns are missing.
"""

root = resolve_datadir(datadir)

p = (profile or "").strip().lower()
try:
schemas = schemas_for_profile(p)
except ValueError as e:
raise TrackDDataError(
f"Unknown profile: {profile}.\n"
f"Use one of: {', '.join(ALLOWED_PROFILES)}"
) from e

report: dict[str, Any] = {
"ok": True,
"profile": p,
"datadir": str(root),
"missing_files": [],
"missing_columns": {},
"found_columns": {},
}

missing_files: list[str] = []
missing_columns: dict[str, list[str]] = {}
found_columns: dict[str, list[str]] = {}

for schema in schemas:
table_path = root / schema.name
if not table_path.exists():
missing_files.append(schema.name)
continue

cols = _read_header(table_path)
found_columns[schema.name] = cols
missing = [c for c in schema.required_columns if c not in set(cols)]
if missing:
missing_columns[schema.name] = missing

if missing_files or missing_columns:
report["ok"] = False
report["missing_files"] = missing_files
report["missing_columns"] = missing_columns
report["found_columns"] = found_columns
raise TrackDSchemaError(_format_report(report))

report["missing_files"] = []
report["missing_columns"] = {}
report["found_columns"] = found_columns
return report
60 changes: 60 additions & 0 deletions tests/test_trackd_validate_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

from pathlib import Path

from pystatsv1.cli import main


def _write_chart_of_accounts(p: Path, *, missing_normal_side: bool = False) -> None:
# Minimal valid header set for the contract.
cols = ["account_id", "account_name", "account_type"]
if not missing_normal_side:
cols.append("normal_side")
row = "1,Cash,asset" + ("" if missing_normal_side else ",debit")
p.write_text(",".join(cols) + "\n" + row + "\n", encoding="utf-8")


def _write_gl_journal(p: Path, *, missing_credit: bool = False) -> None:
cols = ["txn_id", "date", "doc_id", "description", "account_id", "debit"]
if not missing_credit:
cols.append("credit")
row = "t1,2025-01-01,d1,Example,1,100" + ("" if missing_credit else ",0")
p.write_text(",".join(cols) + "\n" + row + "\n", encoding="utf-8")


def test_trackd_validate_missing_datadir_is_friendly(tmp_path: Path, capsys) -> None:
missing = tmp_path / "nope"
rc = main(["trackd", "validate", "--datadir", str(missing), "--profile", "core_gl"])
out = capsys.readouterr().out

assert rc == 1
assert "Data directory not found" in out
assert "Hint:" in out


def test_trackd_validate_missing_csv_is_friendly(tmp_path: Path, capsys) -> None:
# Only chart_of_accounts.csv exists; gl_journal.csv is missing.
_write_chart_of_accounts(tmp_path / "chart_of_accounts.csv")

rc = main(["trackd", "validate", "--datadir", str(tmp_path), "--profile", "core_gl"])
out = capsys.readouterr().out

assert rc == 1
assert "Track D dataset validation failed" in out
assert "Profile: core_gl" in out
assert "Missing CSV files" in out
assert "gl_journal.csv" in out


def test_trackd_validate_missing_required_columns_is_friendly(tmp_path: Path, capsys) -> None:
_write_chart_of_accounts(tmp_path / "chart_of_accounts.csv")
_write_gl_journal(tmp_path / "gl_journal.csv", missing_credit=True)

rc = main(["trackd", "validate", "--datadir", str(tmp_path), "--profile", "core_gl"])
out = capsys.readouterr().out

assert rc == 1
assert "CSV files with missing required columns" in out
assert "gl_journal.csv" in out
assert "credit" in out
assert "Found columns" in out