From d2c5f650b811e1ebb55cbd81c7a7ef3431583806 Mon Sep 17 00:00:00 2001 From: Nicholas Karlson Date: Tue, 20 Jan 2026 13:57:01 -0800 Subject: [PATCH] Track D: add dataset profiles + validate CLI --- src/pystatsv1/cli.py | 44 ++++++++++ src/pystatsv1/trackd/contracts.py | 53 ++++++++++++ src/pystatsv1/trackd/validate.py | 133 ++++++++++++++++++++++++++++++ tests/test_trackd_validate_cli.py | 60 ++++++++++++++ 4 files changed, 290 insertions(+) create mode 100644 src/pystatsv1/trackd/contracts.py create mode 100644 src/pystatsv1/trackd/validate.py create mode 100644 tests/test_trackd_validate_cli.py diff --git a/src/pystatsv1/cli.py b/src/pystatsv1/cli.py index bec4162..5820bea 100644 --- a/src/pystatsv1/cli.py +++ b/src/pystatsv1/cli.py @@ -396,6 +396,30 @@ def cmd_doctor(args: argparse.Namespace) -> int: return 1 +def cmd_trackd_validate(args: argparse.Namespace) -> int: + # Keep CLI wiring lightweight: validation logic lives in pystatsv1.trackd.validate. + from pystatsv1.trackd import TrackDDataError, TrackDSchemaError + from pystatsv1.trackd.validate import validate_dataset + + try: + validate_dataset(args.datadir, profile=args.profile) + except (TrackDDataError, TrackDSchemaError) as e: + print(str(e)) + return 1 + + print( + textwrap.dedent( + f"""\ + ✅ Track D dataset looks valid. + + Profile: {args.profile} + Data directory: {Path(args.datadir).expanduser()} + """ + ).rstrip() + ) + return 0 + + def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( prog="pystatsv1", @@ -473,6 +497,26 @@ def build_parser() -> argparse.ArgumentParser: ) p_check.set_defaults(func=cmd_workbook_check) + p_trackd = sub.add_parser("trackd", help="Track D helpers (business datasets).") + td_sub = p_trackd.add_subparsers(dest="trackd_cmd", required=True) + + p_td_validate = td_sub.add_parser( + "validate", + help="Validate a Track D dataset folder against a profile (BYOD foundations).", + ) + p_td_validate.add_argument( + "--datadir", + required=True, + help="Path to the folder containing exported Track D CSV tables.", + ) + p_td_validate.add_argument( + "--profile", + default="full", + choices=["core_gl", "ar", "full"], + help="Which profile to validate (default: full).", + ) + p_td_validate.set_defaults(func=cmd_trackd_validate) + return p diff --git a/src/pystatsv1/trackd/contracts.py b/src/pystatsv1/trackd/contracts.py new file mode 100644 index 0000000..a88f125 --- /dev/null +++ b/src/pystatsv1/trackd/contracts.py @@ -0,0 +1,53 @@ +# SPDX-License-Identifier: MIT +"""Dataset profile contracts for Track D. + +Phase 2 (BYOD foundations) introduces *profiles* that let users validate smaller +subsets of the full Track D dataset contract. + +Profiles are defined as ordered subsets of the existing Track D contract tables +in :mod:`pystatsv1.trackd.schema`. +""" + +from __future__ import annotations + +from typing import Final + +from .schema import CONTRACT_TABLES, NSO_V1_TABLE_ORDER, TableSchema + + +# Ordered subsets of the existing Track D contract. +# +# - core_gl: the smallest General Ledger "starter" export (Chart of Accounts + GL) +# - ar: adds accounts receivable events +# - full: all Track D tables in canonical workbook order +PROFILE_TABLE_KEYS: Final[dict[str, tuple[str, ...]]] = { + "core_gl": ( + "chart_of_accounts", + "gl_journal", + ), + "ar": ( + "chart_of_accounts", + "gl_journal", + "ar_events", + ), + "full": NSO_V1_TABLE_ORDER, +} + + +ALLOWED_PROFILES: Final[tuple[str, ...]] = tuple(PROFILE_TABLE_KEYS.keys()) + + +def schemas_for_profile(profile: str) -> tuple[TableSchema, ...]: + """Return the ordered :class:`~pystatsv1.trackd.schema.TableSchema` tuple for a profile.""" + + p = (profile or "").strip().lower() + keys = PROFILE_TABLE_KEYS.get(p) + if keys is None: + raise ValueError(f"Unknown profile: {profile}. Use one of: {', '.join(ALLOWED_PROFILES)}") + + # Defensive: ensure profile only references known contract tables. + missing = [k for k in keys if k not in CONTRACT_TABLES] + if missing: + raise ValueError(f"Profile '{p}' references unknown contract tables: {', '.join(missing)}") + + return tuple(CONTRACT_TABLES[k] for k in keys) diff --git a/src/pystatsv1/trackd/validate.py b/src/pystatsv1/trackd/validate.py new file mode 100644 index 0000000..95cae4d --- /dev/null +++ b/src/pystatsv1/trackd/validate.py @@ -0,0 +1,133 @@ +# SPDX-License-Identifier: MIT +"""Validation helpers for Track D dataset folders. + +Unlike :func:`pystatsv1.trackd.loaders.load_table`, which is intentionally +fail-fast, the functions here aggregate all schema issues into one friendly +summary that is suitable for student-facing CLI output. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import pandas as pd + +from ._errors import TrackDDataError, TrackDSchemaError +from ._types import PathLike +from .contracts import ALLOWED_PROFILES, schemas_for_profile +from .loaders import resolve_datadir + + +def _read_header(path: Path) -> list[str]: + # Cheap header-only read for required-column validation. + df = pd.read_csv(path, nrows=0) + return [str(c) for c in df.columns] + + +def _format_report(report: dict[str, Any]) -> str: + profile = str(report.get("profile", "")) + datadir = str(report.get("datadir", "")) + + missing_files: list[str] = list(report.get("missing_files", [])) + missing_cols: dict[str, list[str]] = dict(report.get("missing_columns", {})) + found_cols: dict[str, list[str]] = dict(report.get("found_columns", {})) + + lines: list[str] = [ + "Track D dataset validation failed.", + f"Profile: {profile}", + f"Data directory: {datadir}", + "", + ] + + if missing_files: + lines += ["Missing CSV files:", *[f" - {n}" for n in missing_files], ""] + + if missing_cols: + lines.append("CSV files with missing required columns:") + for name in sorted(missing_cols.keys()): + cols = missing_cols[name] + lines.append(f" - {name}: missing {', '.join(cols)}") + found = found_cols.get(name) + if found: + lines.append(f" Found columns: {', '.join(found)}") + lines.append("") + + lines += [ + "Fix: export the required CSV(s) and ensure the header names match the Track D contract.", + "Tip: compare your exported CSV headers against the workbook downloads.", + ] + return "\n".join(lines) + + +def validate_dataset(datadir: PathLike | None, *, profile: str = "full") -> dict[str, Any]: + """Validate a Track D dataset folder against a profile. + + Parameters + ---------- + datadir: + Folder containing CSV inputs. + profile: + One of: core_gl, ar, full. + + Returns + ------- + dict + A report dict with keys: + ok, profile, datadir, missing_files, missing_columns, found_columns. + + Raises + ------ + TrackDDataError + If *datadir* is missing or not a directory. + TrackDSchemaError + If required files or required columns are missing. + """ + + root = resolve_datadir(datadir) + + p = (profile or "").strip().lower() + try: + schemas = schemas_for_profile(p) + except ValueError as e: + raise TrackDDataError( + f"Unknown profile: {profile}.\n" + f"Use one of: {', '.join(ALLOWED_PROFILES)}" + ) from e + + report: dict[str, Any] = { + "ok": True, + "profile": p, + "datadir": str(root), + "missing_files": [], + "missing_columns": {}, + "found_columns": {}, + } + + missing_files: list[str] = [] + missing_columns: dict[str, list[str]] = {} + found_columns: dict[str, list[str]] = {} + + for schema in schemas: + table_path = root / schema.name + if not table_path.exists(): + missing_files.append(schema.name) + continue + + cols = _read_header(table_path) + found_columns[schema.name] = cols + missing = [c for c in schema.required_columns if c not in set(cols)] + if missing: + missing_columns[schema.name] = missing + + if missing_files or missing_columns: + report["ok"] = False + report["missing_files"] = missing_files + report["missing_columns"] = missing_columns + report["found_columns"] = found_columns + raise TrackDSchemaError(_format_report(report)) + + report["missing_files"] = [] + report["missing_columns"] = {} + report["found_columns"] = found_columns + return report diff --git a/tests/test_trackd_validate_cli.py b/tests/test_trackd_validate_cli.py new file mode 100644 index 0000000..c672347 --- /dev/null +++ b/tests/test_trackd_validate_cli.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from pathlib import Path + +from pystatsv1.cli import main + + +def _write_chart_of_accounts(p: Path, *, missing_normal_side: bool = False) -> None: + # Minimal valid header set for the contract. + cols = ["account_id", "account_name", "account_type"] + if not missing_normal_side: + cols.append("normal_side") + row = "1,Cash,asset" + ("" if missing_normal_side else ",debit") + p.write_text(",".join(cols) + "\n" + row + "\n", encoding="utf-8") + + +def _write_gl_journal(p: Path, *, missing_credit: bool = False) -> None: + cols = ["txn_id", "date", "doc_id", "description", "account_id", "debit"] + if not missing_credit: + cols.append("credit") + row = "t1,2025-01-01,d1,Example,1,100" + ("" if missing_credit else ",0") + p.write_text(",".join(cols) + "\n" + row + "\n", encoding="utf-8") + + +def test_trackd_validate_missing_datadir_is_friendly(tmp_path: Path, capsys) -> None: + missing = tmp_path / "nope" + rc = main(["trackd", "validate", "--datadir", str(missing), "--profile", "core_gl"]) + out = capsys.readouterr().out + + assert rc == 1 + assert "Data directory not found" in out + assert "Hint:" in out + + +def test_trackd_validate_missing_csv_is_friendly(tmp_path: Path, capsys) -> None: + # Only chart_of_accounts.csv exists; gl_journal.csv is missing. + _write_chart_of_accounts(tmp_path / "chart_of_accounts.csv") + + rc = main(["trackd", "validate", "--datadir", str(tmp_path), "--profile", "core_gl"]) + out = capsys.readouterr().out + + assert rc == 1 + assert "Track D dataset validation failed" in out + assert "Profile: core_gl" in out + assert "Missing CSV files" in out + assert "gl_journal.csv" in out + + +def test_trackd_validate_missing_required_columns_is_friendly(tmp_path: Path, capsys) -> None: + _write_chart_of_accounts(tmp_path / "chart_of_accounts.csv") + _write_gl_journal(tmp_path / "gl_journal.csv", missing_credit=True) + + rc = main(["trackd", "validate", "--datadir", str(tmp_path), "--profile", "core_gl"]) + out = capsys.readouterr().out + + assert rc == 1 + assert "CSV files with missing required columns" in out + assert "gl_journal.csv" in out + assert "credit" in out + assert "Found columns" in out