From 60ac70372d0817d57d3380a7c3fb148e5cdc8f97 Mon Sep 17 00:00:00 2001 From: aworki <1224518406@qq.com> Date: Thu, 5 Mar 2026 00:05:52 +0800 Subject: [PATCH] feat(audit): add stability audit mode with active-skill filtering --- scripts/audit.py | 222 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 199 insertions(+), 23 deletions(-) diff --git a/scripts/audit.py b/scripts/audit.py index 59d501c..62fd51c 100755 --- a/scripts/audit.py +++ b/scripts/audit.py @@ -1,18 +1,17 @@ #!/usr/bin/env python3 -"""Security audit for skills in the Skill Evolution registry. +"""Security + stability audit for skills. -Scans all skills (or specific ones) against security rules. -Skills that pass get marked with audited_at timestamp. -Skills that fail get audited_at cleared. - -Designed to run periodically (e.g. via cron or scheduler). -Requires SUPABASE_SERVICE_KEY (admin-only operation). +Modes: +1) Security audit (default): scan registry skills via Supabase. +2) Stability audit (--stability-audit): scan local git history for skills/* A->D churn. """ import argparse import json import re +import subprocess import sys +import time import urllib.parse from pathlib import Path @@ -22,10 +21,22 @@ def parse_args(): - p = argparse.ArgumentParser(description="Audit skills for security issues") + p = argparse.ArgumentParser(description="Audit skills for security/stability issues") p.add_argument("--name", default=None, help="Audit a specific skill by name (default: all)") p.add_argument("--dry-run", action="store_true", help="Show results without updating database") p.add_argument("--verbose", "-v", action="store_true", help="Show detailed findings per skill") + + # Stability audit mode + p.add_argument("--stability-audit", action="store_true", help="Run git-based stability audit for skills/*") + p.add_argument("--repo", default=str(Path(__file__).resolve().parents[2]), help="Repo root for git scan") + p.add_argument("--days", type=int, default=7, help="Lookback window in days for stability audit") + p.add_argument("--top", type=int, default=1, help="Top N churn candidates to output") + p.add_argument("--report-file", default=None, help="Optional path to write markdown audit section") + p.add_argument( + "--include-deleted", + action="store_true", + help="Include skills that are no longer present under skills/ (default: active skills only)", + ) return p.parse_args() @@ -109,7 +120,7 @@ def audit_skill(skill): matches = re.findall(pattern, content, re.IGNORECASE) for m in matches: # Skip false positives - if any(fp in m for fp in ["SUPABASE", "TASKPOOL", "$HOME", "${HOME}", "os.environ", "os.getenv"]): + if any(fp in m for fp in ["SUPABASE", "TASKPOOL", "${HOME}", "os.environ", "os.getenv"]): continue findings.append(f"FAIL [{rel_path}]: {desc} — {m[:50]}...") @@ -141,9 +152,149 @@ def audit_skill(skill): return not has_fail, findings -def main(): - args = parse_args() +def _run_git(repo, args): + cmd = ["git", "-C", repo, *args] + return subprocess.check_output(cmd, text=True, stderr=subprocess.DEVNULL) + + +def _list_active_skills(repo): + skills_dir = Path(repo) / "skills" + if not skills_dir.exists(): + return set() + return { + p.name + for p in skills_dir.iterdir() + if p.is_dir() and not p.name.startswith(".") + } + + +def run_stability_audit(repo, days, top, report_file=None, include_deleted=False): + """Scan git history and find skills with A->D churn within lookback window.""" + try: + output = _run_git( + repo, + [ + "log", + f"--since={days} days ago", + "--name-status", + "--pretty=format:__COMMIT__%H|%ct|%s", + "--", + "skills/", + ], + ) + except Exception as e: + raise RuntimeError(f"ERROR: git log scan failed: {e}") + + skills = {} + current = None + + for raw in output.splitlines(): + line = raw.strip() + if not line: + continue + if line.startswith("__COMMIT__"): + meta = line.replace("__COMMIT__", "", 1) + parts = meta.split("|", 2) + if len(parts) < 3: + current = None + continue + current = {"hash": parts[0], "ts": int(parts[1]), "subject": parts[2]} + continue + if current is None: + continue + + fields = line.split("\t") + if len(fields) < 2: + continue + status = fields[0][0] # A/M/D/R... + path = fields[-1] + + if not path.startswith("skills/"): + continue + + parts = path.split("/") + if len(parts) < 2: + continue + skill = parts[1] + + rec = skills.setdefault( + skill, + { + "skill": skill, + "adds": 0, + "deletes": 0, + "last_add": None, + "last_delete": None, + "latest_ts": 0, + }, + ) + + if status == "A": + rec["adds"] += 1 + if rec["last_add"] is None or current["ts"] > rec["last_add"]["ts"]: + rec["last_add"] = current + elif status == "D": + rec["deletes"] += 1 + if rec["last_delete"] is None or current["ts"] > rec["last_delete"]["ts"]: + rec["last_delete"] = current + + if current["ts"] > rec["latest_ts"]: + rec["latest_ts"] = current["ts"] + + churn_candidates = [v for v in skills.values() if v["adds"] > 0 and v["deletes"] > 0] + + active_skills = _list_active_skills(repo) + if include_deleted: + candidates = churn_candidates + else: + candidates = [c for c in churn_candidates if c["skill"] in active_skills] + + candidates.sort(key=lambda x: (x["adds"] + x["deletes"], x["latest_ts"]), reverse=True) + top_candidates = candidates[: max(1, top)] + + result = { + "status": "ok", + "mode": "stability", + "lookback_days": days, + "generated_at": int(time.time()), + "skills_scanned": len(skills), + "active_skills": len(active_skills), + "churn_candidates_total": len(churn_candidates), + "churn_candidates": len(candidates), + "filtered_out_removed": max(0, len(churn_candidates) - len(candidates)), + "include_deleted": include_deleted, + "top": top_candidates, + } + + if report_file: + p = Path(report_file) + p.parent.mkdir(parents=True, exist_ok=True) + with p.open("w", encoding="utf-8") as f: + f.write("## Stability Audit Candidates\n\n") + f.write(f"- Lookback: {days} days\n") + scope = "skills/* (active only)" if not include_deleted else "skills/* (including deleted)" + f.write(f"- Scope: {scope}\n") + f.write(f"- A→D churn candidates: {len(candidates)}") + if not include_deleted: + f.write(f" (filtered removed: {max(0, len(churn_candidates) - len(candidates))})") + f.write("\n\n") + if top_candidates: + t = top_candidates[0] + f.write(f"### Top1: `{t['skill']}`\n") + f.write(f"- Change counts: A={t['adds']} / D={t['deletes']}\n") + if t.get("last_add"): + f.write(f"- Recent add: {t['last_add']['hash'][:8]} · {t['last_add']['subject']}\n") + if t.get("last_delete"): + f.write(f"- Recent delete: {t['last_delete']['hash'][:8]} · {t['last_delete']['subject']}\n") + f.write("- Suggestion: prioritize this skill in today's improvement recommendation.\n") + else: + f.write("- No A→D churn candidates detected today.\n") + + return result + + +def run_security_audit(args): # Fetch skills (service key to read file_tree which may not be exposed via anon) select = "id,name,variant,description,author,skill_md,file_tree,audited_at" if args.name: @@ -182,24 +333,49 @@ def main(): # Update database if not args.dry_run: - supabase_rpc("audit_skill", { - "p_skill_id": skill["id"], - "p_passed": passed, - }, service_key=True, exit_on_error=False) + supabase_rpc( + "audit_skill", + { + "p_skill_id": skill["id"], + "p_passed": passed, + }, + service_key=True, + exit_on_error=False, + ) status = "PASS" if passed else "FAIL" print(f" {status}: {label} ({len(findings)} findings)", file=sys.stderr) if args.dry_run: print(json.dumps(results, indent=2, ensure_ascii=False)) else: - summary = f"Audit complete: {results['passed']}/{results['total']} passed, {results['failed']} failed" - print(summary, file=sys.stderr) - print(json.dumps({ - "status": "ok", - "total": results["total"], - "passed": results["passed"], - "failed": results["failed"], - }, ensure_ascii=False)) + print( + json.dumps( + { + "status": "ok", + "total": results["total"], + "passed": results["passed"], + "failed": results["failed"], + }, + ensure_ascii=False, + ) + ) + + +def main(): + args = parse_args() + + if args.stability_audit: + result = run_stability_audit( + repo=args.repo, + days=max(1, args.days), + top=max(1, args.top), + report_file=args.report_file, + include_deleted=args.include_deleted, + ) + print(json.dumps(result, ensure_ascii=False)) + return + + run_security_audit(args) if __name__ == "__main__":