diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7a60b85 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +*.pyc diff --git a/mgtp/decision_record.py b/mgtp/decision_record.py new file mode 100644 index 0000000..4abac27 --- /dev/null +++ b/mgtp/decision_record.py @@ -0,0 +1,157 @@ +"""decision_record — Deterministic, time-bounded, evidenced decision artefact. + +Canonical serialisation rules: +- Fixed field order (explicit tuple _CANONICAL_FIELDS). +- Stable JSON: UTF-8, separators=(",", ":"), sort_keys=True. +- Stable list ordering: reason_codes and evidence refs sorted before serialisation. +- decision_hash = sha256(canonical_bytes()) as lowercase hex. + +Fail-closed rules (enforced by evaluate()): +- decision_time outside [authority_window_start, authority_window_end] + => verdict forced to REFUSE, reason_code "outside_authority_window" appended. +- verdict == ALLOW and provided_evidence is None + => verdict forced to REFUSE, reason_code "evidence_missing" appended. +""" + +import hashlib +import json +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + +from .types import EvidenceRef, Verdict + +_CANONICAL_FIELDS = ( + "record_id", + "decision_time", + "authority_window_start", + "authority_window_end", + "verdict", + "reason_codes", + "provided_evidence", +) + +OUTSIDE_WINDOW_REASON = "outside_authority_window" +EVIDENCE_MISSING_REASON = "evidence_missing" + + +def _parse_rfc3339(ts: str) -> datetime: + """Parse an RFC3339 / ISO-8601 timestamp string; 'Z' suffix accepted.""" + return datetime.fromisoformat(ts.replace("Z", "+00:00")) + + +@dataclass(frozen=True) +class DecisionRecord: + """Immutable decision artefact with deterministic serialisation. + + Fields + ------ + record_id Unique identifier for this decision. + decision_time UTC timestamp of the decision (RFC3339). + authority_window_start Start of the valid authority window (RFC3339). + authority_window_end End of the valid authority window (RFC3339). + verdict One of Verdict.{ALLOW, REFUSE, SUPERVISED}. + reason_codes Sorted tuple of reason strings. + provided_evidence Sorted tuple of EvidenceRef, or None. + """ + + record_id: str + decision_time: str + authority_window_start: str + authority_window_end: str + verdict: str + reason_codes: tuple + provided_evidence: Optional[tuple] + + def canonical_bytes(self) -> bytes: + """Return deterministic UTF-8 JSON bytes for this record. + + - Keys in sort_keys order (alphabetical). + - reason_codes sorted. + - provided_evidence list sorted by ref_id (or None). + - No whitespace in separators. + """ + evidence_json: Optional[list] = None + if self.provided_evidence is not None: + evidence_json = sorted( + [{"description": e.description, "ref_id": e.ref_id} for e in self.provided_evidence], + key=lambda x: x["ref_id"], + ) + obj = { + "record_id": self.record_id, + "decision_time": self.decision_time, + "authority_window_start": self.authority_window_start, + "authority_window_end": self.authority_window_end, + "verdict": self.verdict, + "reason_codes": sorted(self.reason_codes), + "provided_evidence": evidence_json, + } + return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") + + @property + def decision_hash(self) -> str: + """SHA-256 hex digest (lowercase) of canonical_bytes().""" + return hashlib.sha256(self.canonical_bytes()).hexdigest() + + +def evaluate( + record_id: str, + decision_time: str, + authority_window_start: str, + authority_window_end: str, + requested_verdict: str, + reason_codes: list, + provided_evidence: Optional[list], +) -> "DecisionRecord": + """Build a DecisionRecord, applying fail-closed enforcement. + + Enforcement (in order): + 1. If decision_time is outside [authority_window_start, authority_window_end] + the verdict is forced to REFUSE and OUTSIDE_WINDOW_REASON is appended. + 2. If the resulting verdict is ALLOW and provided_evidence is None + the verdict is forced to REFUSE and EVIDENCE_MISSING_REASON is appended. + + Parameters + ---------- + record_id Unique record identifier. + decision_time UTC timestamp of decision (RFC3339). + authority_window_start Window start (RFC3339). + authority_window_end Window end (RFC3339). + requested_verdict Caller-supplied verdict string (Verdict member value). + reason_codes List of reason strings. + provided_evidence List of EvidenceRef, or None. + + Returns + ------- + DecisionRecord with final, enforced verdict. + """ + verdict = requested_verdict + codes = list(reason_codes) + + dt = _parse_rfc3339(decision_time) + window_start = _parse_rfc3339(authority_window_start) + window_end = _parse_rfc3339(authority_window_end) + + if not (window_start <= dt <= window_end): + verdict = Verdict.REFUSE.value + if OUTSIDE_WINDOW_REASON not in codes: + codes.append(OUTSIDE_WINDOW_REASON) + + if verdict == Verdict.ALLOW.value and provided_evidence is None: + verdict = Verdict.REFUSE.value + if EVIDENCE_MISSING_REASON not in codes: + codes.append(EVIDENCE_MISSING_REASON) + + evidence_tuple: Optional[tuple] = None + if provided_evidence is not None: + evidence_tuple = tuple(sorted(provided_evidence, key=lambda e: e.ref_id)) + + return DecisionRecord( + record_id=record_id, + decision_time=decision_time, + authority_window_start=authority_window_start, + authority_window_end=authority_window_end, + verdict=verdict, + reason_codes=tuple(sorted(codes)), + provided_evidence=evidence_tuple, + ) diff --git a/mgtp/types.py b/mgtp/types.py index c5da10f..307e394 100644 --- a/mgtp/types.py +++ b/mgtp/types.py @@ -3,6 +3,18 @@ from typing import Optional +class Verdict(str, Enum): + ALLOW = "ALLOW" + REFUSE = "REFUSE" + SUPERVISED = "SUPERVISED" + + +@dataclass(frozen=True) +class EvidenceRef: + ref_id: str + description: str + + class RiskClass(str, Enum): LOW = "LOW" MEDIUM = "MEDIUM" diff --git a/tests/fixtures/golden_decision.json b/tests/fixtures/golden_decision.json new file mode 100644 index 0000000..2bed22e --- /dev/null +++ b/tests/fixtures/golden_decision.json @@ -0,0 +1,14 @@ +{ + "description": "Golden fixture for DecisionRecord determinism / cross-process replay test.", + "inputs": { + "record_id": "rec-golden-001", + "decision_time": "2026-02-28T12:00:00Z", + "authority_window_start": "2026-02-28T00:00:00Z", + "authority_window_end": "2026-02-28T23:59:59Z", + "verdict": "ALLOW", + "reason_codes": ["allowlist_match"], + "provided_evidence": [{"ref_id": "ev-001", "description": "owner confirmed"}] + }, + "golden_canonical_b64": "eyJhdXRob3JpdHlfd2luZG93X2VuZCI6IjIwMjYtMDItMjhUMjM6NTk6NTlaIiwiYXV0aG9yaXR5X3dpbmRvd19zdGFydCI6IjIwMjYtMDItMjhUMDA6MDA6MDBaIiwiZGVjaXNpb25fdGltZSI6IjIwMjYtMDItMjhUMTI6MDA6MDBaIiwicHJvdmlkZWRfZXZpZGVuY2UiOlt7ImRlc2NyaXB0aW9uIjoib3duZXIgY29uZmlybWVkIiwicmVmX2lkIjoiZXYtMDAxIn1dLCJyZWFzb25fY29kZXMiOlsiYWxsb3dsaXN0X21hdGNoIl0sInJlY29yZF9pZCI6InJlYy1nb2xkZW4tMDAxIiwidmVyZGljdCI6IkFMTE9XIn0=", + "golden_hash": "eddc5350bb3f9f584d7c52cbf940bb4e294573e8c5ddf7f12d9f5ca99150bf78" +} diff --git a/tests/test_decision_record_determinism.py b/tests/test_decision_record_determinism.py new file mode 100644 index 0000000..85d2f63 --- /dev/null +++ b/tests/test_decision_record_determinism.py @@ -0,0 +1,222 @@ +"""Tests for DecisionRecord: determinism, golden replay, time-bound, evidence, surface-area guard.""" + +import base64 +import hashlib +import json +from pathlib import Path + +from mgtp.decision_record import ( + EVIDENCE_MISSING_REASON, + OUTSIDE_WINDOW_REASON, + DecisionRecord, + evaluate, +) +from mgtp.types import EvidenceRef + +_FIXTURE_PATH = Path(__file__).parent / "fixtures" / "golden_decision.json" + +_GOLDEN_RECORD = DecisionRecord( + record_id="rec-golden-001", + decision_time="2026-02-28T12:00:00Z", + authority_window_start="2026-02-28T00:00:00Z", + authority_window_end="2026-02-28T23:59:59Z", + verdict="ALLOW", + reason_codes=("allowlist_match",), + provided_evidence=(EvidenceRef(ref_id="ev-001", description="owner confirmed"),), +) + + +# --------------------------------------------------------------------------- +# 1. Determinism: N=50 identical calls must yield identical bytes and hash +# --------------------------------------------------------------------------- + +def test_canonical_bytes_determinism_n50(): + """canonical_bytes() is byte-identical across 50 calls.""" + first = _GOLDEN_RECORD.canonical_bytes() + for _ in range(49): + assert _GOLDEN_RECORD.canonical_bytes() == first, "canonical_bytes() not deterministic" + + +def test_decision_hash_determinism_n50(): + """decision_hash is identical across 50 calls.""" + first = _GOLDEN_RECORD.decision_hash + for _ in range(49): + assert _GOLDEN_RECORD.decision_hash == first, "decision_hash not deterministic" + assert first == first.lower(), "decision_hash must be lowercase hex" + assert len(first) == 64, "decision_hash must be 64-char sha256 hex" + + +# --------------------------------------------------------------------------- +# 2. Cross-process reproducibility: match stored golden fixture +# --------------------------------------------------------------------------- + +def test_golden_replay(): + """Canonical bytes and hash match the stored golden fixture.""" + fixture = json.loads(_FIXTURE_PATH.read_text(encoding="utf-8")) + canonical = _GOLDEN_RECORD.canonical_bytes() + assert base64.b64encode(canonical).decode() == fixture["golden_canonical_b64"], ( + "canonical_bytes() does not match golden fixture" + ) + assert _GOLDEN_RECORD.decision_hash == fixture["golden_hash"], ( + "decision_hash does not match golden fixture" + ) + + +# --------------------------------------------------------------------------- +# 3. Time-bound enforcement +# --------------------------------------------------------------------------- + +def test_decision_time_before_window_refused(): + """decision_time before window_start => verdict REFUSE.""" + rec = evaluate( + record_id="rec-tb-001", + decision_time="2026-02-27T23:59:59Z", # 1 second before window + authority_window_start="2026-02-28T00:00:00Z", + authority_window_end="2026-02-28T23:59:59Z", + requested_verdict="ALLOW", + reason_codes=["allowlist_match"], + provided_evidence=[EvidenceRef(ref_id="ev-001", description="owner confirmed")], + ) + assert rec.verdict == "REFUSE" + assert OUTSIDE_WINDOW_REASON in rec.reason_codes + + +def test_decision_time_after_window_refused(): + """decision_time after window_end => verdict REFUSE.""" + rec = evaluate( + record_id="rec-tb-002", + decision_time="2026-03-01T00:00:00Z", # 1 day after window + authority_window_start="2026-02-28T00:00:00Z", + authority_window_end="2026-02-28T23:59:59Z", + requested_verdict="ALLOW", + reason_codes=["allowlist_match"], + provided_evidence=[EvidenceRef(ref_id="ev-001", description="owner confirmed")], + ) + assert rec.verdict == "REFUSE" + assert OUTSIDE_WINDOW_REASON in rec.reason_codes + + +def test_decision_time_inside_window_accepted(): + """decision_time inside window leaves verdict unchanged.""" + rec = evaluate( + record_id="rec-tb-003", + decision_time="2026-02-28T12:00:00Z", + authority_window_start="2026-02-28T00:00:00Z", + authority_window_end="2026-02-28T23:59:59Z", + requested_verdict="ALLOW", + reason_codes=["allowlist_match"], + provided_evidence=[EvidenceRef(ref_id="ev-001", description="owner confirmed")], + ) + assert rec.verdict == "ALLOW" + assert OUTSIDE_WINDOW_REASON not in rec.reason_codes + + +# --------------------------------------------------------------------------- +# 4. Evidence enforcement (fail-closed) +# --------------------------------------------------------------------------- + +def test_allow_without_evidence_fails_closed(): + """ALLOW with provided_evidence=None => REFUSE with evidence_missing reason.""" + rec = evaluate( + record_id="rec-ev-001", + decision_time="2026-02-28T12:00:00Z", + authority_window_start="2026-02-28T00:00:00Z", + authority_window_end="2026-02-28T23:59:59Z", + requested_verdict="ALLOW", + reason_codes=["allowlist_match"], + provided_evidence=None, + ) + assert rec.verdict == "REFUSE" + assert EVIDENCE_MISSING_REASON in rec.reason_codes + + +def test_refuse_without_evidence_allowed(): + """REFUSE verdict does not require evidence; no evidence_missing injected.""" + rec = evaluate( + record_id="rec-ev-002", + decision_time="2026-02-28T12:00:00Z", + authority_window_start="2026-02-28T00:00:00Z", + authority_window_end="2026-02-28T23:59:59Z", + requested_verdict="REFUSE", + reason_codes=["denylist_match"], + provided_evidence=None, + ) + assert rec.verdict == "REFUSE" + assert EVIDENCE_MISSING_REASON not in rec.reason_codes + + +def test_supervised_without_evidence_allowed(): + """SUPERVISED verdict does not require evidence.""" + rec = evaluate( + record_id="rec-ev-003", + decision_time="2026-02-28T12:00:00Z", + authority_window_start="2026-02-28T00:00:00Z", + authority_window_end="2026-02-28T23:59:59Z", + requested_verdict="SUPERVISED", + reason_codes=["escalation_match"], + provided_evidence=None, + ) + assert rec.verdict == "SUPERVISED" + assert EVIDENCE_MISSING_REASON not in rec.reason_codes + + +# --------------------------------------------------------------------------- +# 5. Surface-area guard: forbidden files must not be modified +# --------------------------------------------------------------------------- + +_REPO_ROOT = Path(__file__).resolve().parent.parent + +_FORBIDDEN_HASHES = { + "authority_gate.py": "78975c58f28c95bdb111f787b8edec58c2bdbdd132e2ea7c8e7b7c1e8e67e6f5", + "stop_machine.py": "473da80d555daf7883bfbe84a24c54660e9f844a6fa8d11d1f9ce68e91a41c5c", +} + + +def _sha256_file(path: Path) -> str: + return hashlib.sha256(path.read_bytes()).hexdigest() + + +def test_surface_area_guard_authority_gate(): + """authority_gate.py must not be modified (hash guard).""" + path = _REPO_ROOT / "authority_gate.py" + assert path.exists(), "authority_gate.py missing" + assert _sha256_file(path) == _FORBIDDEN_HASHES["authority_gate.py"], ( + "authority_gate.py has been modified – revert it" + ) + + +def test_surface_area_guard_stop_machine(): + """stop_machine.py must not be modified (hash guard).""" + path = _REPO_ROOT / "stop_machine.py" + assert path.exists(), "stop_machine.py missing" + assert _sha256_file(path) == _FORBIDDEN_HASHES["stop_machine.py"], ( + "stop_machine.py has been modified – revert it" + ) + + +def test_surface_area_guard_commit_gate_dir(): + """commit_gate/ directory must not be modified (hash guard on each .py file).""" + _COMMIT_GATE_HASHES = { + "commit_gate/tests/test_drift.py": + "df741d98527925a4b2c581c6ee60ca648e8584b3a46fb64cc3cc77fc20605221", + "commit_gate/tests/test_determinism.py": + "b3538e34e6d14778fc5c5250b4f8cd8857cac093e0ff10fa76edb0bab8e09f87", + "commit_gate/src/commit_gate/canonicalise.py": + "69c5e87b7492dd9083f2a309c55d46fa96d47ff67ab019e56b53ad9b3d65ba67", + "commit_gate/src/commit_gate/__init__.py": + "11c733509357bd55131ab6a33a9f3324bcd540ad470ef203ceb58ac81b5d92ff", + "commit_gate/src/commit_gate/cli.py": + "34a06af33216d8190f3df691f0bdae43567f2356f60f942c0b0c0e18cb88a55f", + "commit_gate/src/commit_gate/engine.py": + "0c3849e4843aa0ae3bbfbe49c738e969ab2f48d4a00c8745173ec362fc600011", + "commit_gate/src/commit_gate/drift.py": + "9e4da9eda0cd74a9a9542a14417d098af82abd06d88c8173970d98fbf4c3ebfb", + "commit_gate/src/commit_gate/__main__.py": + "f5bd098fa98ac8da4a5090aaeec471f40b82fbb0ec399922416f6301c681f88c", + } + for rel, expected_hash in _COMMIT_GATE_HASHES.items(): + path = _REPO_ROOT / rel + assert path.exists(), f"{rel} missing from commit_gate" + assert _sha256_file(path) == expected_hash, ( + f"{rel} has been modified – revert it" + )