From 94190d5dbe7b77464bbf65a0080e4f3493e30632 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 15 Mar 2026 21:39:23 +0000 Subject: [PATCH] Add real admission data pipeline and model calibration engine - core/admission_data.py: CSV loader with GPA normalization (4/4.3/5/100 scales), background tier classification, internship scoring, and per-program statistics with feature importance analysis - core/calibrator.py: Calibration engine that computes data-driven GPA thresholds, predicts outcomes, evaluates model accuracy, and generates school_ranker overrides - Updated school_ranker to accept calibration overrides for data-driven reach/target/safety classification - CLI: added 'stats' and 'calibrate' commands - data/admissions/sample.csv: 30 sample records across 11 programs - 45 new tests (218 total), all passing; ruff clean https://claude.ai/code/session_014dkZ9Eq3DPVaUfRTeN2HXp --- cli/main.py | 189 ++++++++++++ core/admission_data.py | 575 +++++++++++++++++++++++++++++++++++ core/calibrator.py | 401 ++++++++++++++++++++++++ core/school_ranker.py | 75 +++-- data/admissions/sample.csv | 31 ++ data/admissions/template.csv | 1 + tests/test_admission_data.py | 284 +++++++++++++++++ tests/test_calibrator.py | 227 ++++++++++++++ 8 files changed, 1766 insertions(+), 17 deletions(-) create mode 100644 core/admission_data.py create mode 100644 core/calibrator.py create mode 100644 data/admissions/sample.csv create mode 100644 data/admissions/template.csv create mode 100644 tests/test_admission_data.py create mode 100644 tests/test_calibrator.py diff --git a/cli/main.py b/cli/main.py index 8aa9f80..d1a6acc 100644 --- a/cli/main.py +++ b/cli/main.py @@ -8,6 +8,8 @@ from rich.table import Table from rich.text import Text +from core.admission_data import load_admission_csv, load_all_admission_data, summarize_records +from core.calibrator import calibrate_all, generate_ranker_overrides from core.data_loader import load_all_programs, load_profile from core.gap_advisor import analyze_gaps from core.interview_prep import ( @@ -610,6 +612,180 @@ def cmd_gaps(args: argparse.Namespace) -> None: console.print() +def cmd_stats(args: argparse.Namespace) -> None: + """Show statistics from real admission data.""" + if args.file: + records = load_admission_csv(args.file) + else: + records = load_all_admission_data() + + if not records: + console.print("[yellow]No admission data found.[/yellow]") + console.print("[dim]Add CSV files to data/admissions/ or use --file.[/dim]") + return + + summary = summarize_records(records) + + console.print() + console.print( + Panel( + f"[bold]{summary['total_records']}[/bold] records from " + f"[bold]{summary['unique_applicants']}[/bold] applicants | " + f"Sources: {', '.join(summary['sources'])} | " + f"Seasons: {', '.join(summary['seasons'])}", + title="Admission Data Statistics", + border_style="cyan", + ) + ) + + # Per-program breakdown + table = Table(border_style="cyan", title="Per-Program Breakdown") + table.add_column("Program", style="bold") + table.add_column("Accepted", style="green", justify="right") + table.add_column("Rejected", style="red", justify="right") + table.add_column("Waitlisted", style="yellow", justify="right") + table.add_column("Total", justify="right") + table.add_column("Obs. Rate", justify="right") + + for prog_id, counts in sorted(summary["programs"].items()): + acc = counts.get("accepted", 0) + rej = counts.get("rejected", 0) + wl = counts.get("waitlisted", 0) + total = acc + rej + wl + decided = acc + rej + rate = f"{acc / decided:.0%}" if decided > 0 else "N/A" + table.add_row(prog_id, str(acc), str(rej), str(wl), str(total), rate) + + console.print(table) + console.print() + + # GPA distribution + from core.admission_data import compute_all_program_stats + + all_stats = compute_all_program_stats(records) + if all_stats: + console.print(Panel("Accepted vs Rejected GPA Comparison", border_style="cyan")) + gpa_table = Table(border_style="cyan") + gpa_table.add_column("Program", style="bold") + gpa_table.add_column("Avg GPA (Acc)", style="green", justify="right") + gpa_table.add_column("Avg GPA (Rej)", style="red", justify="right") + gpa_table.add_column("Gap", justify="right") + gpa_table.add_column("Top Feature", justify="right") + + for pid, stats in sorted(all_stats.items()): + if stats.accepted == 0: + continue + gpa_acc = f"{stats.avg_gpa_accepted:.2f}" if stats.avg_gpa_accepted else "N/A" + gpa_rej = f"{stats.avg_gpa_rejected:.2f}" if stats.avg_gpa_rejected else "N/A" + gap = "" + if stats.avg_gpa_accepted and stats.avg_gpa_rejected: + diff = stats.avg_gpa_accepted - stats.avg_gpa_rejected + gap = f"+{diff:.2f}" if diff >= 0 else f"{diff:.2f}" + + top_feat = "" + if stats.feature_importance: + top = max(stats.feature_importance, key=lambda k: abs(stats.feature_importance[k])) + top_feat = f"{top} ({stats.feature_importance[top]:.2f})" + + gpa_table.add_row(pid, gpa_acc, gpa_rej, gap, top_feat) + + console.print(gpa_table) + console.print() + + +def cmd_calibrate(args: argparse.Namespace) -> None: + """Calibrate scoring model using real admission data.""" + if args.file: + records = load_admission_csv(args.file) + else: + records = load_all_admission_data() + + if not records: + console.print("[yellow]No admission data found.[/yellow]") + console.print("[dim]Add CSV files to data/admissions/ or use --file.[/dim]") + return + + console.print() + console.print(Panel("Running Calibration...", border_style="cyan")) + + result = calibrate_all(records) + + # Thresholds table + table = Table(border_style="cyan", title="Calibrated Program Thresholds") + table.add_column("Program", style="bold") + table.add_column("GPA Floor", justify="right") + table.add_column("GPA Target", justify="right") + table.add_column("GPA Safe", justify="right") + table.add_column("Obs. Rate", justify="right") + table.add_column("Samples", justify="right") + table.add_column("Confidence") + + for pid, threshold in sorted(result.program_thresholds.items()): + conf_color = {"high": "green", "medium": "yellow", "low": "red"}.get( + threshold.confidence, "white" + ) + table.add_row( + pid, + f"{threshold.gpa_floor:.2f}", + f"{threshold.gpa_target:.2f}", + f"{threshold.gpa_safe:.2f}", + f"{threshold.observed_acceptance_rate:.0%}", + str(threshold.sample_size), + f"[{conf_color}]{threshold.confidence}[/{conf_color}]", + ) + + console.print(table) + + # Global feature weights + if result.global_feature_weights: + console.print() + console.print(Panel("Global Feature Importance", border_style="cyan")) + fw_table = Table(border_style="cyan") + fw_table.add_column("Feature", style="bold") + fw_table.add_column("Weight", justify="right") + fw_table.add_column("Bar", width=20) + + for feat, weight in result.global_feature_weights.items(): + bar_len = round(weight * 40) + fw_table.add_row(feat, f"{weight:.1%}", "█" * bar_len) + + console.print(fw_table) + + # Accuracy report + acc = result.accuracy_report + if acc.get("total_predictions", 0) > 0: + console.print() + accuracy_pct = acc.get("accuracy", 0) + acc_color = "green" if accuracy_pct >= 0.7 else "yellow" if accuracy_pct >= 0.5 else "red" + console.print( + f" [bold]Model Accuracy:[/bold] [{acc_color}]{accuracy_pct:.0%}[/{acc_color}] " + f"({acc['correct']} correct / {acc['correct'] + acc['incorrect']} decided, " + f"{acc['borderline']} borderline)" + ) + + # Recommendations + if result.recommendations: + console.print() + console.print(Panel("Recommendations", border_style="yellow")) + for rec in result.recommendations: + console.print(f" - {rec}") + + console.print() + + # If --apply flag, show the overrides that would be applied + if args.apply: + overrides = generate_ranker_overrides(result) + if overrides: + console.print(Panel("Ranker Overrides (Applied)", border_style="green")) + for pid, ov in sorted(overrides.items()): + console.print( + f" {pid}: reach<{ov['reach_gpa_threshold']:.2f} " + f"safe>={ov['safety_gpa_threshold']:.2f} " + f"[dim](n={ov['sample_size']}, {ov['confidence']})[/dim]" + ) + console.print() + + def main() -> None: parser = argparse.ArgumentParser( prog="quantpath", @@ -675,6 +851,17 @@ def main() -> None: p_gaps = subparsers.add_parser("gaps", help="Analyze profile gaps and suggest improvements") p_gaps.add_argument("--profile", "-p", required=True, help="Path to profile YAML") + # stats (real data) + p_stats = subparsers.add_parser("stats", help="Show statistics from real admission data") + p_stats.add_argument("--file", "-f", help="Path to a specific CSV file (default: all)") + + # calibrate (real data) + p_cal = subparsers.add_parser("calibrate", help="Calibrate model using real admission data") + p_cal.add_argument("--file", "-f", help="Path to a specific CSV file (default: all)") + p_cal.add_argument( + "--apply", action="store_true", help="Show ranker overrides that would be applied" + ) + args = parser.parse_args() if args.command is None: @@ -690,6 +877,8 @@ def main() -> None: "compare": cmd_compare, "interview": cmd_interview, "gaps": cmd_gaps, + "stats": cmd_stats, + "calibrate": cmd_calibrate, } commands[args.command](args) diff --git a/core/admission_data.py b/core/admission_data.py new file mode 100644 index 0000000..5741bac --- /dev/null +++ b/core/admission_data.py @@ -0,0 +1,575 @@ +"""Real admission data loader and normalizer. + +Loads CSV files of real applicant outcomes and normalizes fields +(GPA scales, background types, etc.) into a uniform format for +calibration and statistical analysis. + +CSV schema +---------- +id, bg_type, gpa, gpa_scale, gre, toefl, major, intern_desc, +has_paper, has_research, courses_note, program, result, season, source +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Optional + +# --------------------------------------------------------------------------- +# Paths +# --------------------------------------------------------------------------- + +_PACKAGE_ROOT = Path(__file__).resolve().parent.parent +_ADMISSIONS_DIR = _PACKAGE_ROOT / "data" / "admissions" + +# --------------------------------------------------------------------------- +# GPA normalization +# --------------------------------------------------------------------------- + +# Supported scales: 4, 4.3, 5, 100 +_GPA_SCALE_TO_4: dict[float, list[tuple[float, float, float]]] = { + # (threshold, mapped_start, mapped_end) — piecewise linear + # Scale 100 -> 4.0 + 100: [ + (95, 3.9, 4.0), + (90, 3.7, 3.9), + (85, 3.3, 3.7), + (80, 3.0, 3.3), + (75, 2.7, 3.0), + (70, 2.3, 2.7), + (60, 1.7, 2.3), + (0, 0.0, 1.7), + ], + # Scale 5 -> 4.0 + 5: [ + (4.8, 3.9, 4.0), + (4.5, 3.7, 3.9), + (4.0, 3.3, 3.7), + (3.5, 3.0, 3.3), + (3.0, 2.5, 3.0), + (0, 0.0, 2.5), + ], + # Scale 4.3 -> 4.0 (cap at 4.0) + 4.3: [ + (4.0, 3.9, 4.0), + (3.7, 3.7, 3.9), + (3.3, 3.3, 3.7), + (3.0, 3.0, 3.3), + (0, 0.0, 3.0), + ], +} + + +def normalize_gpa(gpa: float, scale: float) -> float: + """Normalize a GPA value to the 4.0 scale. + + Parameters + ---------- + gpa: + The raw GPA value. + scale: + The GPA scale (4, 4.3, 5, or 100). + + Returns + ------- + float + GPA normalized to 0.0-4.0 range. + """ + if scale == 4: + return min(4.0, gpa) + + breakpoints = _GPA_SCALE_TO_4.get(scale) + if breakpoints is None: + # Unknown scale — attempt linear conversion + return min(4.0, gpa * 4.0 / scale) + + for threshold, mapped_lo, mapped_hi in breakpoints: + if gpa >= threshold: + # Find the top of this segment + # For the highest segment, cap at max GPA + seg_top = scale if breakpoints[0] == (threshold, mapped_lo, mapped_hi) else threshold + # Use previous segment's threshold as the top + idx = breakpoints.index((threshold, mapped_lo, mapped_hi)) + if idx == 0: + seg_top = scale + else: + seg_top = breakpoints[idx - 1][0] + + if seg_top == threshold: + return mapped_hi + + frac = (gpa - threshold) / (seg_top - threshold) + return mapped_lo + frac * (mapped_hi - mapped_lo) + + return 0.0 + + +# --------------------------------------------------------------------------- +# Background type classification +# --------------------------------------------------------------------------- + +# Tier mapping for Chinese university classification +BG_TIER_MAP: dict[str, int] = { + # Tier 1: Top overseas / C9 / Peking/Tsinghua + "海本(Top10)": 1, + "海本(Top15)": 1, + "海本(Top20)": 1, + "C9": 1, + # Tier 2: Strong overseas / top 985 + "海本(Top30)": 2, + "海本(Top50)": 2, + "985": 2, + # Tier 3: 211 / strong finance schools + "两财一贸(211)": 3, + "两财一贸": 3, + "211": 3, + # Tier 4: Other + "双非一本": 4, + "双非": 5, + "海本(Top100)": 3, + "海本": 3, +} + + +def classify_background(bg_type: str) -> int: + """Map a background type string to a tier (1=strongest, 5=weakest). + + Performs fuzzy matching against known background categories. + """ + bg_clean = bg_type.strip().replace(" ", "") + + # Exact match first + if bg_clean in BG_TIER_MAP: + return BG_TIER_MAP[bg_clean] + + # Partial match + for key, tier in BG_TIER_MAP.items(): + if key in bg_clean or bg_clean in key: + return tier + + # Keywords + lower = bg_clean.lower() + if "top10" in lower or "top15" in lower: + return 1 + if "top20" in lower or "top30" in lower or "985" in lower or "c9" in lower: + return 2 + if "211" in lower or "top50" in lower or "财" in lower or "贸" in lower: + return 3 + if "海本" in lower: + return 3 + if "双非" in lower: + return 4 + + return 4 # default + + +# --------------------------------------------------------------------------- +# Intern strength scoring +# --------------------------------------------------------------------------- + + +def score_internships(intern_desc: str) -> float: + """Score internship description on a 0-10 scale. + + Heuristic scoring based on keywords: + - Number of internships + - Quality indicators (顶级, top, 百亿, 头部) + - Type indicators (量化, quant, 投行, IB, 对冲, hedge fund) + """ + if not intern_desc or intern_desc.strip() in ("", "无", "N/A"): + return 0.0 + + desc = intern_desc.lower() + score = 0.0 + + # Count internships (Chinese: 段) + for char in "段": + count = desc.count(char) + if count > 0: + # Extract number before 段 + for i, c in enumerate(desc): + if c == "段": + if i > 0 and desc[i - 1].isdigit(): + n = int(desc[i - 1]) + score += min(n * 1.5, 5.0) + break + + # Quality keywords (Chinese + English) + quality_keywords = { + "顶级": 2.0, "top": 1.5, "百亿": 1.5, "头部": 1.5, + "一线": 1.0, "知名": 0.8, "大型": 0.5, + } + for kw, pts in quality_keywords.items(): + if kw in desc: + score += pts + + # Type keywords + type_keywords = { + "量化": 1.5, "quant": 1.5, "投行": 1.5, "ib": 1.0, + "对冲": 1.5, "hedge": 1.5, "私募": 1.0, "qr": 1.0, + "trading": 1.0, "研究": 0.8, "金工": 0.8, + "三中一华": 2.0, "高盛": 2.0, "goldman": 2.0, + "摩根": 2.0, "morgan": 1.5, "kaggle": 1.5, + } + for kw, pts in type_keywords.items(): + if kw in desc: + score += pts + + return min(10.0, score) + + +# --------------------------------------------------------------------------- +# Data model +# --------------------------------------------------------------------------- + + +@dataclass +class AdmissionRecord: + """A single real applicant data point with normalized fields.""" + + id: str = "" + bg_type: str = "" + bg_tier: int = 4 # 1-5, computed from bg_type + gpa_raw: float = 0.0 + gpa_scale: float = 4.0 + gpa_normalized: float = 0.0 # on 4.0 scale + gre: Optional[int] = None + toefl: Optional[int] = None + major: str = "" + intern_desc: str = "" + intern_score: float = 0.0 # 0-10 computed score + has_paper: Optional[bool] = None + has_research: Optional[bool] = None + courses_note: str = "" + program: str = "" + result: str = "" # accepted / rejected / waitlisted + season: str = "" + source: str = "" + + +@dataclass +class ProgramStats: + """Aggregated statistics for a program from real data.""" + + program_id: str = "" + total_records: int = 0 + accepted: int = 0 + rejected: int = 0 + waitlisted: int = 0 + + # Accepted applicant stats + avg_gpa_accepted: float = 0.0 + avg_gre_accepted: float = 0.0 + avg_bg_tier_accepted: float = 0.0 + avg_intern_score_accepted: float = 0.0 + paper_rate_accepted: float = 0.0 + research_rate_accepted: float = 0.0 + + # Rejected applicant stats + avg_gpa_rejected: float = 0.0 + avg_gre_rejected: float = 0.0 + + # Computed metrics + observed_acceptance_rate: float = 0.0 + + # Feature importance (correlation with acceptance) + feature_importance: dict[str, float] = field(default_factory=dict) + + +# --------------------------------------------------------------------------- +# CSV loading +# --------------------------------------------------------------------------- + + +def _parse_bool(val: str) -> Optional[bool]: + """Parse a boolean field that may be '是'/'否'/'不明'/etc.""" + val = val.strip().lower() + if val in ("是", "yes", "true", "1", "有"): + return True + if val in ("否", "no", "false", "0", "无"): + return False + return None # unknown + + +def _parse_int(val: str) -> Optional[int]: + """Parse an integer, stripping non-numeric suffixes like '+'.""" + val = val.strip().rstrip("+").rstrip("分") + if not val or val.lower() in ("", "n/a", "无", "不明"): + return None + try: + return int(val) + except ValueError: + return None + + +def _parse_float(val: str) -> float: + """Parse a float value, defaulting to 0.0.""" + val = val.strip() + if not val or val.lower() in ("n/a", "无", "不明"): + return 0.0 + try: + return float(val) + except ValueError: + return 0.0 + + +def load_admission_csv(path: str | Path) -> list[AdmissionRecord]: + """Load admission records from a CSV file. + + Parameters + ---------- + path: + Path to the CSV file. + + Returns + ------- + list[AdmissionRecord] + Parsed and normalized records. Only includes records with + result in ('accepted', 'rejected', 'waitlisted'). + """ + filepath = Path(path) + if not filepath.exists(): + raise FileNotFoundError(f"Admission data file not found: {filepath}") + + records: list[AdmissionRecord] = [] + + with open(filepath, "r", encoding="utf-8") as fh: + reader = csv.DictReader(fh) + for row in reader: + result = row.get("result", "").strip().lower() + if result not in ("accepted", "rejected", "waitlisted"): + continue + + gpa_raw = _parse_float(row.get("gpa", "0")) + gpa_scale = _parse_float(row.get("gpa_scale", "4")) + if gpa_scale == 0: + gpa_scale = 4.0 + + bg_type = row.get("bg_type", "").strip() + + rec = AdmissionRecord( + id=row.get("id", "").strip(), + bg_type=bg_type, + bg_tier=classify_background(bg_type), + gpa_raw=gpa_raw, + gpa_scale=gpa_scale, + gpa_normalized=normalize_gpa(gpa_raw, gpa_scale), + gre=_parse_int(row.get("gre", "")), + toefl=_parse_int(row.get("toefl", "")), + major=row.get("major", "").strip(), + intern_desc=row.get("intern_desc", "").strip(), + intern_score=score_internships(row.get("intern_desc", "")), + has_paper=_parse_bool(row.get("has_paper", "")), + has_research=_parse_bool(row.get("has_research", "")), + courses_note=row.get("courses_note", "").strip(), + program=row.get("program", "").strip(), + result=result, + season=row.get("season", "").strip(), + source=row.get("source", "").strip(), + ) + records.append(rec) + + return records + + +def load_all_admission_data() -> list[AdmissionRecord]: + """Load all CSV files from the ``data/admissions/`` directory. + + Skips the template file. Returns combined records from all CSVs. + """ + if not _ADMISSIONS_DIR.is_dir(): + return [] + + all_records: list[AdmissionRecord] = [] + for csv_path in sorted(_ADMISSIONS_DIR.glob("*.csv")): + if csv_path.stem == "template": + continue + all_records.extend(load_admission_csv(csv_path)) + return all_records + + +# --------------------------------------------------------------------------- +# Statistics computation +# --------------------------------------------------------------------------- + + +def _safe_avg(values: list[float]) -> float: + """Average of a list, returning 0.0 for empty lists.""" + return sum(values) / len(values) if values else 0.0 + + +def compute_program_stats( + records: list[AdmissionRecord], + program_id: str, +) -> ProgramStats: + """Compute aggregate statistics for a single program. + + Parameters + ---------- + records: + All admission records (will be filtered to program_id). + program_id: + The program ID to compute stats for. + + Returns + ------- + ProgramStats + Aggregated statistics including acceptance rates, average + GPA/GRE for accepted vs rejected, and feature importance. + """ + prog_records = [r for r in records if r.program == program_id] + if not prog_records: + return ProgramStats(program_id=program_id) + + accepted = [r for r in prog_records if r.result == "accepted"] + rejected = [r for r in prog_records if r.result == "rejected"] + waitlisted = [r for r in prog_records if r.result == "waitlisted"] + + stats = ProgramStats( + program_id=program_id, + total_records=len(prog_records), + accepted=len(accepted), + rejected=len(rejected), + waitlisted=len(waitlisted), + ) + + # Accepted stats + if accepted: + stats.avg_gpa_accepted = _safe_avg([r.gpa_normalized for r in accepted]) + gre_vals = [r.gre for r in accepted if r.gre is not None] + stats.avg_gre_accepted = _safe_avg(gre_vals) if gre_vals else 0.0 + stats.avg_bg_tier_accepted = _safe_avg([float(r.bg_tier) for r in accepted]) + stats.avg_intern_score_accepted = _safe_avg([r.intern_score for r in accepted]) + paper_known = [r for r in accepted if r.has_paper is not None] + stats.paper_rate_accepted = ( + sum(1 for r in paper_known if r.has_paper) / len(paper_known) + if paper_known + else 0.0 + ) + research_known = [r for r in accepted if r.has_research is not None] + stats.research_rate_accepted = ( + sum(1 for r in research_known if r.has_research) / len(research_known) + if research_known + else 0.0 + ) + + # Rejected stats + if rejected: + stats.avg_gpa_rejected = _safe_avg([r.gpa_normalized for r in rejected]) + gre_vals_rej = [r.gre for r in rejected if r.gre is not None] + stats.avg_gre_rejected = _safe_avg(gre_vals_rej) if gre_vals_rej else 0.0 + + # Observed acceptance rate + decided = len(accepted) + len(rejected) + stats.observed_acceptance_rate = len(accepted) / decided if decided > 0 else 0.0 + + # Feature importance (simple correlation: avg_accepted vs avg_rejected) + if accepted and rejected: + stats.feature_importance = _compute_feature_importance(accepted, rejected) + + return stats + + +def _compute_feature_importance( + accepted: list[AdmissionRecord], + rejected: list[AdmissionRecord], +) -> dict[str, float]: + """Compute simple feature importance as effect size between groups. + + Uses the difference of means normalized by pooled std as a proxy + for feature discriminative power. Higher absolute value = more important. + """ + import math + + features: dict[str, float] = {} + + def _effect_size(acc_vals: list[float], rej_vals: list[float]) -> float: + if not acc_vals or not rej_vals: + return 0.0 + mean_a = sum(acc_vals) / len(acc_vals) + mean_r = sum(rej_vals) / len(rej_vals) + var_a = sum((v - mean_a) ** 2 for v in acc_vals) / max(len(acc_vals), 1) + var_r = sum((v - mean_r) ** 2 for v in rej_vals) / max(len(rej_vals), 1) + pooled_std = math.sqrt((var_a + var_r) / 2) or 1.0 + return (mean_a - mean_r) / pooled_std + + # GPA (normalized) + features["gpa"] = _effect_size( + [r.gpa_normalized for r in accepted], + [r.gpa_normalized for r in rejected], + ) + + # GRE + acc_gre = [float(r.gre) for r in accepted if r.gre is not None] + rej_gre = [float(r.gre) for r in rejected if r.gre is not None] + features["gre"] = _effect_size(acc_gre, rej_gre) + + # Background tier (inverted: lower tier = better) + features["bg_tier"] = -_effect_size( + [float(r.bg_tier) for r in accepted], + [float(r.bg_tier) for r in rejected], + ) + + # Intern score + features["intern"] = _effect_size( + [r.intern_score for r in accepted], + [r.intern_score for r in rejected], + ) + + # Paper + features["paper"] = _effect_size( + [1.0 if r.has_paper else 0.0 for r in accepted if r.has_paper is not None], + [1.0 if r.has_paper else 0.0 for r in rejected if r.has_paper is not None], + ) + + # Research + features["research"] = _effect_size( + [1.0 if r.has_research else 0.0 for r in accepted if r.has_research is not None], + [1.0 if r.has_research else 0.0 for r in rejected if r.has_research is not None], + ) + + return features + + +def compute_all_program_stats( + records: list[AdmissionRecord], +) -> dict[str, ProgramStats]: + """Compute stats for every program found in the records. + + Returns + ------- + dict[str, ProgramStats] + Mapping of program_id to ProgramStats. + """ + program_ids = sorted({r.program for r in records if r.program}) + return {pid: compute_program_stats(records, pid) for pid in program_ids} + + +def summarize_records(records: list[AdmissionRecord]) -> dict[str, Any]: + """Generate a high-level summary of the admission dataset. + + Returns + ------- + dict + Summary with total counts, program breakdown, season info, etc. + """ + programs = {} + for r in records: + if r.program not in programs: + programs[r.program] = {"accepted": 0, "rejected": 0, "waitlisted": 0} + programs[r.program][r.result] = programs[r.program].get(r.result, 0) + 1 + + seasons = sorted({r.season for r in records if r.season}) + sources = sorted({r.source for r in records if r.source}) + + return { + "total_records": len(records), + "unique_applicants": len({r.id for r in records}), + "programs": programs, + "seasons": seasons, + "sources": sources, + "avg_gpa_normalized": _safe_avg([r.gpa_normalized for r in records]), + "gre_available": sum(1 for r in records if r.gre is not None), + } diff --git a/core/calibrator.py b/core/calibrator.py new file mode 100644 index 0000000..1a3b1a5 --- /dev/null +++ b/core/calibrator.py @@ -0,0 +1,401 @@ +"""Calibration engine — tunes scoring weights using real admission outcomes. + +Uses real admission data (accepted/rejected) to: +1. Compute per-program acceptance thresholds +2. Adjust the school ranker's reach/target/safety classification +3. Estimate feature importance for admission decisions +4. Generate accuracy metrics for the current scoring model + +The calibration uses logistic-style scoring — no external ML libraries needed. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +from .admission_data import ( + AdmissionRecord, + ProgramStats, + compute_all_program_stats, +) + +# --------------------------------------------------------------------------- +# Calibrated thresholds +# --------------------------------------------------------------------------- + + +@dataclass +class ProgramThreshold: + """Calibrated thresholds for a single program.""" + + program_id: str = "" + + # GPA thresholds (on 4.0 scale) + gpa_floor: float = 0.0 # below this -> almost certainly rejected + gpa_target: float = 0.0 # above this -> competitive + gpa_safe: float = 0.0 # above this -> strong safety + + # Background tier threshold + max_bg_tier_accepted: int = 5 # highest tier (worst) still accepted + + # Intern score threshold + min_intern_score_accepted: float = 0.0 + + # Observed rates + observed_acceptance_rate: float = 0.0 + + # Confidence (based on sample size) + sample_size: int = 0 + confidence: str = "low" # low / medium / high + + # Feature weights for this program + feature_weights: dict[str, float] = field(default_factory=dict) + + +@dataclass +class CalibrationResult: + """Output of the full calibration process.""" + + program_thresholds: dict[str, ProgramThreshold] = field(default_factory=dict) + global_feature_weights: dict[str, float] = field(default_factory=dict) + accuracy_report: dict[str, Any] = field(default_factory=dict) + recommendations: list[str] = field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Core calibration +# --------------------------------------------------------------------------- + + +def _confidence_level(n: int) -> str: + """Determine confidence based on sample size.""" + if n >= 30: + return "high" + if n >= 10: + return "medium" + return "low" + + +def calibrate_program( + stats: ProgramStats, + records: list[AdmissionRecord], +) -> ProgramThreshold: + """Calibrate thresholds for a single program based on real data. + + Parameters + ---------- + stats: + Pre-computed program statistics. + records: + All records (filtered internally to this program). + + Returns + ------- + ProgramThreshold + Data-driven thresholds for classification decisions. + """ + prog_records = [r for r in records if r.program == stats.program_id] + accepted = [r for r in prog_records if r.result == "accepted"] + + threshold = ProgramThreshold( + program_id=stats.program_id, + sample_size=stats.total_records, + confidence=_confidence_level(stats.total_records), + observed_acceptance_rate=stats.observed_acceptance_rate, + ) + + if accepted: + # GPA floor: minimum GPA among accepted applicants + gpas_accepted = [r.gpa_normalized for r in accepted] + threshold.gpa_floor = min(gpas_accepted) + threshold.gpa_target = sum(gpas_accepted) / len(gpas_accepted) + # Safe threshold: 90th percentile of accepted + sorted_gpas = sorted(gpas_accepted) + p90_idx = int(len(sorted_gpas) * 0.9) + threshold.gpa_safe = sorted_gpas[min(p90_idx, len(sorted_gpas) - 1)] + + # Background tier + threshold.max_bg_tier_accepted = max(r.bg_tier for r in accepted) + + # Intern score + intern_scores = [r.intern_score for r in accepted] + threshold.min_intern_score_accepted = min(intern_scores) + + if stats.feature_importance: + threshold.feature_weights = stats.feature_importance + + return threshold + + +def calibrate_all( + records: list[AdmissionRecord], +) -> CalibrationResult: + """Run full calibration across all programs in the dataset. + + Parameters + ---------- + records: + All admission records loaded from CSV. + + Returns + ------- + CalibrationResult + Thresholds, global weights, accuracy metrics, and recommendations. + """ + all_stats = compute_all_program_stats(records) + + program_thresholds: dict[str, ProgramThreshold] = {} + for pid, stats in all_stats.items(): + program_thresholds[pid] = calibrate_program(stats, records) + + # Global feature weights (average across programs with enough data) + global_weights = _compute_global_weights(program_thresholds) + + # Accuracy of current classification + accuracy = _evaluate_accuracy(records, program_thresholds) + + # Recommendations + recommendations = _generate_recommendations( + program_thresholds, all_stats, accuracy + ) + + return CalibrationResult( + program_thresholds=program_thresholds, + global_feature_weights=global_weights, + accuracy_report=accuracy, + recommendations=recommendations, + ) + + +def _compute_global_weights( + thresholds: dict[str, ProgramThreshold], +) -> dict[str, float]: + """Average feature weights across programs with sufficient data.""" + feature_sums: dict[str, float] = {} + feature_counts: dict[str, int] = {} + + for pt in thresholds.values(): + if pt.confidence in ("medium", "high") and pt.feature_weights: + for feat, weight in pt.feature_weights.items(): + feature_sums[feat] = feature_sums.get(feat, 0.0) + abs(weight) + feature_counts[feat] = feature_counts.get(feat, 0) + 1 + + if not feature_sums: + return {} + + # Normalize to sum to 1 + raw = { + feat: feature_sums[feat] / feature_counts[feat] + for feat in feature_sums + } + total = sum(raw.values()) or 1.0 + return {feat: round(val / total, 3) for feat, val in sorted(raw.items(), key=lambda x: -x[1])} + + +# --------------------------------------------------------------------------- +# Accuracy evaluation +# --------------------------------------------------------------------------- + + +def predict_outcome( + record: AdmissionRecord, + threshold: ProgramThreshold, +) -> str: + """Predict admission outcome based on calibrated thresholds. + + Returns 'accepted', 'rejected', or 'borderline'. + """ + score = 0.0 + max_score = 0.0 + + # GPA component (40%) + weight_gpa = 0.4 + max_score += weight_gpa + if threshold.gpa_target > 0: + gpa_ratio = record.gpa_normalized / threshold.gpa_target + score += weight_gpa * min(1.0, gpa_ratio) + + # Background tier (25%) + weight_bg = 0.25 + max_score += weight_bg + if threshold.max_bg_tier_accepted > 0: + bg_ratio = 1.0 - (record.bg_tier - 1) / 4.0 # tier 1=1.0, tier 5=0.0 + score += weight_bg * max(0.0, bg_ratio) + + # Intern score (20%) + weight_intern = 0.2 + max_score += weight_intern + if record.intern_score > 0: + score += weight_intern * min(1.0, record.intern_score / 8.0) + + # Research/paper bonus (15%) + weight_research = 0.15 + max_score += weight_research + bonus = 0.0 + if record.has_paper: + bonus += 0.5 + if record.has_research: + bonus += 0.5 + score += weight_research * bonus + + # Classify based on score ratio + ratio = score / max_score if max_score > 0 else 0.0 + + # Adjust threshold by program selectivity + accept_threshold = 0.55 + if threshold.observed_acceptance_rate < 0.15: + accept_threshold = 0.65 + elif threshold.observed_acceptance_rate > 0.50: + accept_threshold = 0.45 + + if ratio >= accept_threshold: + return "accepted" + if ratio >= accept_threshold - 0.15: + return "borderline" + return "rejected" + + +def _evaluate_accuracy( + records: list[AdmissionRecord], + thresholds: dict[str, ProgramThreshold], +) -> dict[str, Any]: + """Evaluate prediction accuracy against actual outcomes.""" + results: dict[str, Any] = { + "total_predictions": 0, + "correct": 0, + "incorrect": 0, + "borderline": 0, + "per_program": {}, + } + + for record in records: + if record.program not in thresholds: + continue + if record.result == "waitlisted": + continue + + threshold = thresholds[record.program] + predicted = predict_outcome(record, threshold) + + results["total_predictions"] += 1 + + if predicted == "borderline": + results["borderline"] += 1 + elif predicted == record.result: + results["correct"] += 1 + else: + results["incorrect"] += 1 + + # Per-program tracking + if record.program not in results["per_program"]: + results["per_program"][record.program] = { + "correct": 0, "incorrect": 0, "borderline": 0, "total": 0, + } + prog_stats = results["per_program"][record.program] + prog_stats["total"] += 1 + if predicted == "borderline": + prog_stats["borderline"] += 1 + elif predicted == record.result: + prog_stats["correct"] += 1 + else: + prog_stats["incorrect"] += 1 + + decided = results["correct"] + results["incorrect"] + results["accuracy"] = results["correct"] / decided if decided > 0 else 0.0 + + return results + + +# --------------------------------------------------------------------------- +# Recommendations +# --------------------------------------------------------------------------- + + +def _generate_recommendations( + thresholds: dict[str, ProgramThreshold], + all_stats: dict[str, ProgramStats], + accuracy: dict[str, Any], +) -> list[str]: + """Generate actionable recommendations from calibration results.""" + recs: list[str] = [] + + # Data quantity check + total = sum(t.sample_size for t in thresholds.values()) + low_data = [pid for pid, t in thresholds.items() if t.confidence == "low"] + + if total < 50: + recs.append( + f"Dataset has only {total} records. Collect more data for " + "reliable calibration (target: 100+ records, 30+ per program)." + ) + + if low_data: + recs.append( + f"Low confidence for {len(low_data)} programs: " + f"{', '.join(low_data[:5])}. Need 10+ records per program." + ) + + # Accuracy check + acc = accuracy.get("accuracy", 0) + if acc < 0.6: + recs.append( + f"Current model accuracy is {acc:.0%}. Consider adding " + "more features (coursework detail, recommendation quality)." + ) + elif acc >= 0.8: + recs.append( + f"Model accuracy is {acc:.0%} — strong predictive power. " + "Continue collecting data to maintain reliability." + ) + + # Feature insights + for pid, threshold in thresholds.items(): + if threshold.feature_weights: + fw = threshold.feature_weights + top_feat = max(fw, key=lambda k: abs(fw[k])) + if abs(threshold.feature_weights[top_feat]) > 1.0: + recs.append( + f"{pid}: '{top_feat}' is the strongest differentiator " + f"(effect size: {threshold.feature_weights[top_feat]:.2f})." + ) + + return recs + + +# --------------------------------------------------------------------------- +# Integration: generate school_ranker overrides +# --------------------------------------------------------------------------- + + +def generate_ranker_overrides( + calibration: CalibrationResult, +) -> dict[str, dict[str, Any]]: + """Generate per-program overrides for the school ranker. + + These overrides can replace the hardcoded thresholds in + ``school_ranker._classify()`` with data-driven values. + + Returns + ------- + dict[str, dict] + Mapping of program_id to override dict with keys: + ``reach_gpa_threshold``, ``safety_gpa_threshold``, + ``observed_acceptance_rate``. + """ + overrides: dict[str, dict[str, Any]] = {} + + for pid, threshold in calibration.program_thresholds.items(): + if threshold.confidence == "low" and threshold.sample_size < 5: + continue + + overrides[pid] = { + "reach_gpa_threshold": round(threshold.gpa_target, 2), + "safety_gpa_threshold": round(threshold.gpa_safe, 2), + "observed_acceptance_rate": round(threshold.observed_acceptance_rate, 3), + "gpa_floor": round(threshold.gpa_floor, 2), + "confidence": threshold.confidence, + "sample_size": threshold.sample_size, + } + + return overrides diff --git a/core/school_ranker.py b/core/school_ranker.py index 6397026..42c98c8 100644 --- a/core/school_ranker.py +++ b/core/school_ranker.py @@ -6,11 +6,14 @@ - Prerequisite match score - Programme acceptance rate - Overall evaluation score from the profile evaluator + +When calibration data is available, data-driven thresholds override the +default heuristic rules. """ from __future__ import annotations -from typing import Any +from typing import Any, Optional from .models import EvaluationResult, ProgramData, UserProfile from .prerequisite_matcher import match_prerequisites @@ -24,18 +27,43 @@ def _classify( user_gpa: float, program_avg_gpa: float, acceptance_rate: float, + overrides: Optional[dict[str, Any]] = None, ) -> str: """Classify a programme as reach, target, or safety. - Rules (applied in order): + When *overrides* are provided (from calibration), uses data-driven + GPA thresholds instead of the default heuristic rules. + + Default rules (applied in order): 1. If acceptance_rate < 0.08 OR user_gpa < program_avg_gpa: ``"reach"`` 2. If acceptance_rate > 0.15 AND user_gpa >= program_avg_gpa + 0.1: ``"safety"`` 3. Otherwise: ``"target"`` + + Calibrated rules (when overrides provided): + 1. If user_gpa < gpa_floor: ``"reach"`` + 2. If user_gpa >= safety_gpa_threshold: ``"safety"`` + 3. If user_gpa >= reach_gpa_threshold: ``"target"`` + 4. Otherwise: ``"reach"`` """ - acceptance_rate = acceptance_rate or 0.15 # default to moderate if unknown - program_avg_gpa = program_avg_gpa or 3.80 # default if unknown + # Use calibrated thresholds when available + if overrides and overrides.get("confidence") in ("medium", "high"): + gpa_floor = overrides.get("gpa_floor", 0) + reach_threshold = overrides.get("reach_gpa_threshold", program_avg_gpa) + safety_threshold = overrides.get("safety_gpa_threshold", program_avg_gpa + 0.1) + + if user_gpa < gpa_floor: + return "reach" + if user_gpa >= safety_threshold: + return "safety" + if user_gpa >= reach_threshold: + return "target" + return "reach" + + # Fallback: default heuristic rules + acceptance_rate = acceptance_rate or 0.15 + program_avg_gpa = program_avg_gpa or 3.80 if acceptance_rate < 0.08 or user_gpa < program_avg_gpa: return "reach" if acceptance_rate > 0.15 and user_gpa >= program_avg_gpa + 0.1: @@ -112,6 +140,7 @@ def rank_schools( profile: UserProfile, programs: list[ProgramData], evaluation: EvaluationResult, + calibration_overrides: Optional[dict[str, dict[str, Any]]] = None, ) -> dict[str, Any]: """Rank and classify a set of programmes for the given applicant. @@ -124,6 +153,9 @@ def rank_schools( evaluation: A pre-computed :class:`EvaluationResult` from the profile evaluator. + calibration_overrides: + Optional dict of per-program overrides from the calibrator. + When provided, classification uses data-driven thresholds. Returns ------- @@ -143,16 +175,19 @@ def rank_schools( ``avg_gpa``. """ results: list[dict[str, Any]] = [] + overrides = calibration_overrides or {} for prog in programs: # Prerequisite matching. pmatch = match_prerequisites(profile, prog) - # Classification. + # Classification (with optional data-driven overrides). + prog_overrides = overrides.get(prog.id) category = _classify( user_gpa=profile.gpa, program_avg_gpa=prog.avg_gpa, acceptance_rate=prog.acceptance_rate, + overrides=prog_overrides, ) # Fit score. @@ -164,18 +199,24 @@ def rank_schools( overall_eval_score=evaluation.overall_score, ) - results.append( - { - "program_id": prog.id, - "name": prog.name, - "university": prog.university, - "category": category, - "fit_score": fit, - "prereq_match_score": pmatch.match_score, - "acceptance_rate": prog.acceptance_rate, - "avg_gpa": prog.avg_gpa, - } - ) + result_entry: dict[str, Any] = { + "program_id": prog.id, + "name": prog.name, + "university": prog.university, + "category": category, + "fit_score": fit, + "prereq_match_score": pmatch.match_score, + "acceptance_rate": prog.acceptance_rate, + "avg_gpa": prog.avg_gpa, + } + + # Add calibration info if available + if prog_overrides: + result_entry["calibrated"] = True + result_entry["confidence"] = prog_overrides.get("confidence", "low") + result_entry["sample_size"] = prog_overrides.get("sample_size", 0) + + results.append(result_entry) # Sort each bucket by fit_score descending. results.sort(key=lambda r: -r["fit_score"]) diff --git a/data/admissions/sample.csv b/data/admissions/sample.csv new file mode 100644 index 0000000..2e7e071 --- /dev/null +++ b/data/admissions/sample.csv @@ -0,0 +1,31 @@ +id,bg_type,gpa,gpa_scale,gre,toefl,major,intern_desc,has_paper,has_research,courses_note,program,result,season,source +1,两财一贸(211),91.8,100,331,110+,金工,3段量化私募QR(含top百亿)+三中一华金工组,不明,不明,,baruch-mfe,accepted,2025Fall,quantnet +2,两财一贸(211),91.8,100,331,110+,金工,3段量化私募QR(含top百亿)+三中一华金工组,不明,不明,,princeton-mfin,rejected,2025Fall,quantnet +3,两财一贸(211),91.8,100,331,110+,金工,3段量化私募QR(含top百亿)+三中一华金工组,不明,不明,,cmu-mscf,accepted,2025Fall,quantnet +4,两财一贸(211),91.8,100,331,110+,金工,3段量化私募QR(含top百亿)+三中一华金工组,不明,不明,,mit-mfin,rejected,2025Fall,quantnet +5,两财一贸(211),91.8,100,331,110+,金工,3段量化私募QR(含top百亿)+三中一华金工组,不明,不明,,uchicago-msfm,accepted,2025Fall,quantnet +6,两财一贸(211),91.8,100,331,110+,金工,3段量化私募QR(含top百亿)+三中一华金工组,不明,不明,,gatech-qcf,accepted,2025Fall,quantnet +7,两财一贸(211),91.8,100,331,110+,金工,3段量化私募QR(含top百亿)+三中一华金工组,不明,不明,,nus-qf,accepted,2025Fall,quantnet +8,985,3.8,4,332,112,数学,2段量化实习+1段券商研究,是,是,实分析+随机过程+C++,baruch-mfe,accepted,2025Fall,chasedream +9,985,3.8,4,332,112,数学,2段量化实习+1段券商研究,是,是,实分析+随机过程+C++,princeton-mfin,rejected,2025Fall,chasedream +10,985,3.8,4,332,112,数学,2段量化实习+1段券商研究,是,是,实分析+随机过程+C++,cmu-mscf,accepted,2025Fall,chasedream +11,海本(Top30),3.92,4,335,,,2段顶级量化+1段投行,是,否,数学+CS双专业,baruch-mfe,accepted,2025Fall,linkedin +12,海本(Top30),3.92,4,335,,,2段顶级量化+1段投行,是,否,数学+CS双专业,princeton-mfin,accepted,2025Fall,linkedin +13,海本(Top30),3.92,4,335,,,2段顶级量化+1段投行,是,否,数学+CS双专业,cmu-mscf,accepted,2025Fall,linkedin +14,985,3.5,4,325,105,金融,1段银行实习,否,否,,columbia-mafn,rejected,2025Fall,chasedream +15,985,3.5,4,325,105,金融,1段银行实习,否,否,,gatech-qcf,accepted,2025Fall,chasedream +16,211,3.6,4,328,108,统计,2段数据分析实习,否,是,时间序列+回归分析,nyu-mfe,rejected,2025Fall,quantnet +17,211,3.6,4,328,108,统计,2段数据分析实习,否,是,时间序列+回归分析,rutgers-msmf,accepted,2025Fall,quantnet +18,211,3.6,4,328,108,统计,2段数据分析实习,否,是,时间序列+回归分析,fordham-msqf,accepted,2025Fall,quantnet +19,985,87,100,330,115,金工,3段量化+1段投行,是,是,随机微积分+实分析+ML,baruch-mfe,accepted,2025Fall,offershow +20,985,87,100,330,115,金工,3段量化+1段投行,是,是,随机微积分+实分析+ML,nyu-mfe,accepted,2025Fall,offershow +21,海本(Top50),3.7,4,329,,,1段量化实习+1段fintech,否,否,CS+数学辅修,columbia-mafn,accepted,2025Fall,offershow +22,海本(Top50),3.7,4,329,,,1段量化实习+1段fintech,否,否,CS+数学辅修,mit-mfin,rejected,2025Fall,offershow +23,双非一本,3.8,4,326,102,应用数学,1段量化实习,否,否,概率论+线代+微积分,rutgers-msmf,accepted,2025Fall,chasedream +24,双非一本,3.8,4,326,102,应用数学,1段量化实习,否,否,概率论+线代+微积分,baruch-mfe,rejected,2025Fall,chasedream +25,985,3.9,4,333,118,计算机,3段量化实习+kaggle金牌,是,是,ML+深度学习+C++,baruch-mfe,accepted,2025Fall,quantnet +26,985,3.9,4,333,118,计算机,3段量化实习+kaggle金牌,是,是,ML+深度学习+C++,cmu-mscf,accepted,2025Fall,quantnet +27,985,3.9,4,333,118,计算机,3段量化实习+kaggle金牌,是,是,ML+深度学习+C++,princeton-mfin,waitlisted,2025Fall,quantnet +28,海本(Top10),3.95,4.3,337,,,数学+金融双专业,2段顶级投行+1段对冲基金,是,是,实分析+泛函+随机微积分,princeton-mfin,accepted,2025Fall,linkedin +29,海本(Top10),3.95,4.3,337,,,数学+金融双专业,2段顶级投行+1段对冲基金,是,是,实分析+泛函+随机微积分,baruch-mfe,accepted,2025Fall,linkedin +30,海本(Top10),3.95,4.3,337,,,数学+金融双专业,2段顶级投行+1段对冲基金,是,是,实分析+泛函+随机微积分,mit-mfin,accepted,2025Fall,linkedin diff --git a/data/admissions/template.csv b/data/admissions/template.csv new file mode 100644 index 0000000..76904e3 --- /dev/null +++ b/data/admissions/template.csv @@ -0,0 +1 @@ +id,bg_type,gpa,gpa_scale,gre,toefl,major,intern_desc,has_paper,has_research,courses_note,program,result,season,source diff --git a/tests/test_admission_data.py b/tests/test_admission_data.py new file mode 100644 index 0000000..35660fa --- /dev/null +++ b/tests/test_admission_data.py @@ -0,0 +1,284 @@ +"""Tests for core.admission_data — CSV loading, GPA normalization, scoring.""" + +from __future__ import annotations + +import csv +import tempfile +from pathlib import Path + +import pytest + +from core.admission_data import ( + AdmissionRecord, + classify_background, + compute_all_program_stats, + compute_program_stats, + load_admission_csv, + normalize_gpa, + score_internships, + summarize_records, +) + + +# =================================================================== +# GPA normalization +# =================================================================== + + +class TestNormalizeGPA: + """Tests for normalize_gpa().""" + + def test_scale_4_passthrough(self): + assert normalize_gpa(3.8, 4) == 3.8 + + def test_scale_4_caps_at_4(self): + assert normalize_gpa(4.2, 4) == 4.0 + + def test_scale_100_high(self): + """91.8/100 should map to ~3.7-3.9 range.""" + result = normalize_gpa(91.8, 100) + assert 3.7 <= result <= 3.9 + + def test_scale_100_90(self): + result = normalize_gpa(90, 100) + assert 3.7 <= result <= 3.8 + + def test_scale_100_85(self): + result = normalize_gpa(85, 100) + assert 3.3 <= result <= 3.5 + + def test_scale_100_80(self): + result = normalize_gpa(80, 100) + assert 3.0 <= result <= 3.3 + + def test_scale_5_high(self): + result = normalize_gpa(4.5, 5) + assert 3.7 <= result <= 3.9 + + def test_scale_4_3_high(self): + """3.95/4.3 should map close to 3.9+.""" + result = normalize_gpa(3.95, 4.3) + assert result >= 3.85 + + def test_scale_4_3_caps(self): + result = normalize_gpa(4.3, 4.3) + assert result <= 4.0 + + def test_unknown_scale_linear(self): + """Unknown scale should use linear conversion.""" + result = normalize_gpa(8.0, 10) + assert 3.0 <= result <= 3.5 + + +# =================================================================== +# Background classification +# =================================================================== + + +class TestClassifyBackground: + """Tests for classify_background().""" + + @pytest.mark.parametrize( + "bg_type,expected_tier", + [ + ("海本(Top10)", 1), + ("C9", 1), + ("海本(Top30)", 2), + ("985", 2), + ("两财一贸(211)", 3), + ("211", 3), + ("双非一本", 4), + ], + ) + def test_known_types(self, bg_type, expected_tier): + assert classify_background(bg_type) == expected_tier + + def test_unknown_defaults_to_4(self): + assert classify_background("其他学校") == 4 + + def test_partial_match(self): + """Should match '985' within a longer string.""" + assert classify_background("某985高校") == 2 + + +# =================================================================== +# Internship scoring +# =================================================================== + + +class TestScoreInternships: + """Tests for score_internships().""" + + def test_empty(self): + assert score_internships("") == 0.0 + assert score_internships("无") == 0.0 + + def test_strong_intern(self): + desc = "3段量化私募QR(含top百亿)+三中一华金工组" + score = score_internships(desc) + assert score >= 5.0 + + def test_weak_intern(self): + desc = "1段银行实习" + score = score_internships(desc) + assert 0 < score < 5.0 + + def test_top_intern(self): + desc = "2段顶级量化+1段投行" + score = score_internships(desc) + assert score >= 4.0 + + def test_capped_at_10(self): + desc = "3段顶级量化私募QR(含top百亿)+三中一华金工组+高盛+对冲基金" + score = score_internships(desc) + assert score <= 10.0 + + +# =================================================================== +# CSV loading +# =================================================================== + + +class TestLoadAdmissionCSV: + """Tests for load_admission_csv().""" + + def _write_csv(self, rows: list[dict], tmp_dir: str) -> str: + path = Path(tmp_dir) / "test.csv" + fieldnames = [ + "id", "bg_type", "gpa", "gpa_scale", "gre", "toefl", "major", + "intern_desc", "has_paper", "has_research", "courses_note", + "program", "result", "season", "source", + ] + with open(path, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for row in rows: + writer.writerow(row) + return str(path) + + def test_load_basic(self, tmp_path): + path = self._write_csv( + [ + { + "id": "1", "bg_type": "985", "gpa": "3.8", "gpa_scale": "4", + "gre": "332", "toefl": "112", "major": "数学", + "intern_desc": "2段量化实习", "has_paper": "是", + "has_research": "是", "courses_note": "", + "program": "baruch-mfe", "result": "accepted", + "season": "2025Fall", "source": "quantnet", + }, + ], + str(tmp_path), + ) + records = load_admission_csv(path) + assert len(records) == 1 + assert records[0].result == "accepted" + assert records[0].gpa_normalized == 3.8 + assert records[0].bg_tier == 2 + assert records[0].gre == 332 + + def test_skips_pending(self, tmp_path): + path = self._write_csv( + [ + { + "id": "1", "bg_type": "985", "gpa": "3.8", "gpa_scale": "4", + "gre": "", "toefl": "", "major": "", "intern_desc": "", + "has_paper": "", "has_research": "", "courses_note": "", + "program": "baruch-mfe", "result": "pending", + "season": "", "source": "", + }, + ], + str(tmp_path), + ) + records = load_admission_csv(path) + assert len(records) == 0 + + def test_normalizes_100_scale(self, tmp_path): + path = self._write_csv( + [ + { + "id": "1", "bg_type": "211", "gpa": "91.8", "gpa_scale": "100", + "gre": "331", "toefl": "110+", "major": "金工", + "intern_desc": "", "has_paper": "不明", "has_research": "不明", + "courses_note": "", "program": "cmu-mscf", "result": "accepted", + "season": "2025Fall", "source": "test", + }, + ], + str(tmp_path), + ) + records = load_admission_csv(path) + assert len(records) == 1 + assert 3.7 <= records[0].gpa_normalized <= 3.9 + assert records[0].toefl == 110 # stripped '+' + assert records[0].has_paper is None # '不明' -> None + + def test_file_not_found(self): + with pytest.raises(FileNotFoundError): + load_admission_csv("/nonexistent/path.csv") + + +# =================================================================== +# Program statistics +# =================================================================== + + +class TestComputeStats: + """Tests for compute_program_stats() and compute_all_program_stats().""" + + def _make_records(self) -> list[AdmissionRecord]: + return [ + AdmissionRecord( + id="1", program="baruch-mfe", result="accepted", + gpa_normalized=3.8, gre=332, bg_tier=2, intern_score=7.0, + has_paper=True, has_research=True, + ), + AdmissionRecord( + id="2", program="baruch-mfe", result="accepted", + gpa_normalized=3.9, gre=335, bg_tier=1, intern_score=8.0, + has_paper=True, has_research=True, + ), + AdmissionRecord( + id="3", program="baruch-mfe", result="rejected", + gpa_normalized=3.5, gre=325, bg_tier=4, intern_score=2.0, + has_paper=False, has_research=False, + ), + AdmissionRecord( + id="4", program="cmu-mscf", result="accepted", + gpa_normalized=3.85, gre=333, bg_tier=2, intern_score=6.0, + has_paper=None, has_research=None, + ), + ] + + def test_program_stats_basic(self): + records = self._make_records() + stats = compute_program_stats(records, "baruch-mfe") + assert stats.total_records == 3 + assert stats.accepted == 2 + assert stats.rejected == 1 + assert stats.observed_acceptance_rate == pytest.approx(2 / 3, rel=0.01) + assert stats.avg_gpa_accepted == pytest.approx(3.85, rel=0.01) + + def test_empty_program(self): + records = self._make_records() + stats = compute_program_stats(records, "nonexistent") + assert stats.total_records == 0 + + def test_all_program_stats(self): + records = self._make_records() + all_stats = compute_all_program_stats(records) + assert "baruch-mfe" in all_stats + assert "cmu-mscf" in all_stats + assert all_stats["baruch-mfe"].accepted == 2 + + def test_feature_importance_computed(self): + records = self._make_records() + stats = compute_program_stats(records, "baruch-mfe") + # Should have feature importance because there are accepted and rejected + assert len(stats.feature_importance) > 0 + assert "gpa" in stats.feature_importance + + def test_summarize(self): + records = self._make_records() + summary = summarize_records(records) + assert summary["total_records"] == 4 + assert "baruch-mfe" in summary["programs"] diff --git a/tests/test_calibrator.py b/tests/test_calibrator.py new file mode 100644 index 0000000..5dd8ff3 --- /dev/null +++ b/tests/test_calibrator.py @@ -0,0 +1,227 @@ +"""Tests for core.calibrator — model calibration and accuracy evaluation.""" + +from __future__ import annotations + +import pytest + +from core.admission_data import AdmissionRecord +from core.calibrator import ( + CalibrationResult, + ProgramThreshold, + calibrate_all, + calibrate_program, + generate_ranker_overrides, + predict_outcome, +) +from core.admission_data import compute_program_stats + + +# =================================================================== +# Test fixtures +# =================================================================== + + +def _make_baruch_records() -> list[AdmissionRecord]: + """Simulate Baruch MFE admission data.""" + return [ + # Strong accepted applicants + AdmissionRecord( + id="1", program="baruch-mfe", result="accepted", + gpa_normalized=3.9, bg_tier=1, intern_score=8.0, + gre=335, has_paper=True, has_research=True, + ), + AdmissionRecord( + id="2", program="baruch-mfe", result="accepted", + gpa_normalized=3.8, bg_tier=2, intern_score=7.0, + gre=332, has_paper=True, has_research=True, + ), + AdmissionRecord( + id="3", program="baruch-mfe", result="accepted", + gpa_normalized=3.85, bg_tier=2, intern_score=6.5, + gre=330, has_paper=False, has_research=True, + ), + # Rejected applicants + AdmissionRecord( + id="4", program="baruch-mfe", result="rejected", + gpa_normalized=3.5, bg_tier=4, intern_score=2.0, + gre=325, has_paper=False, has_research=False, + ), + AdmissionRecord( + id="5", program="baruch-mfe", result="rejected", + gpa_normalized=3.6, bg_tier=3, intern_score=3.0, + gre=328, has_paper=False, has_research=False, + ), + ] + + +def _make_mixed_records() -> list[AdmissionRecord]: + """Multiple programs for full calibration.""" + records = _make_baruch_records() + records.extend([ + AdmissionRecord( + id="6", program="cmu-mscf", result="accepted", + gpa_normalized=3.85, bg_tier=2, intern_score=7.0, + gre=333, has_paper=True, has_research=True, + ), + AdmissionRecord( + id="7", program="cmu-mscf", result="rejected", + gpa_normalized=3.4, bg_tier=4, intern_score=1.0, + gre=320, has_paper=False, has_research=False, + ), + AdmissionRecord( + id="8", program="gatech-qcf", result="accepted", + gpa_normalized=3.5, bg_tier=3, intern_score=4.0, + gre=325, has_paper=False, has_research=False, + ), + AdmissionRecord( + id="9", program="gatech-qcf", result="accepted", + gpa_normalized=3.6, bg_tier=2, intern_score=5.0, + gre=328, has_paper=False, has_research=True, + ), + ]) + return records + + +# =================================================================== +# calibrate_program +# =================================================================== + + +class TestCalibrateProgram: + """Tests for calibrate_program().""" + + def test_gpa_thresholds(self): + records = _make_baruch_records() + stats = compute_program_stats(records, "baruch-mfe") + threshold = calibrate_program(stats, records) + + # GPA floor should be min GPA of accepted (3.8) + assert threshold.gpa_floor == pytest.approx(3.8, rel=0.01) + # GPA target should be average of accepted + assert 3.8 <= threshold.gpa_target <= 3.9 + # Safe threshold should be high + assert threshold.gpa_safe >= 3.85 + + def test_acceptance_rate(self): + records = _make_baruch_records() + stats = compute_program_stats(records, "baruch-mfe") + threshold = calibrate_program(stats, records) + assert threshold.observed_acceptance_rate == pytest.approx(3 / 5, rel=0.01) + + def test_confidence_level(self): + records = _make_baruch_records() + stats = compute_program_stats(records, "baruch-mfe") + threshold = calibrate_program(stats, records) + assert threshold.confidence == "low" # only 5 records + + def test_bg_tier(self): + records = _make_baruch_records() + stats = compute_program_stats(records, "baruch-mfe") + threshold = calibrate_program(stats, records) + assert threshold.max_bg_tier_accepted == 2 + + def test_empty_program(self): + records = _make_baruch_records() + stats = compute_program_stats(records, "nonexistent") + threshold = calibrate_program(stats, records) + assert threshold.sample_size == 0 + + +# =================================================================== +# calibrate_all +# =================================================================== + + +class TestCalibrateAll: + """Tests for calibrate_all().""" + + def test_all_programs_calibrated(self): + records = _make_mixed_records() + result = calibrate_all(records) + + assert isinstance(result, CalibrationResult) + assert "baruch-mfe" in result.program_thresholds + assert "cmu-mscf" in result.program_thresholds + assert "gatech-qcf" in result.program_thresholds + + def test_accuracy_report_present(self): + records = _make_mixed_records() + result = calibrate_all(records) + + assert "total_predictions" in result.accuracy_report + assert result.accuracy_report["total_predictions"] > 0 + + def test_recommendations_generated(self): + records = _make_mixed_records() + result = calibrate_all(records) + # Should have at least a data quantity recommendation + assert len(result.recommendations) > 0 + + +# =================================================================== +# predict_outcome +# =================================================================== + + +class TestPredictOutcome: + """Tests for predict_outcome().""" + + def test_strong_applicant_accepted(self): + threshold = ProgramThreshold( + program_id="baruch-mfe", + gpa_target=3.85, + max_bg_tier_accepted=2, + observed_acceptance_rate=0.60, + ) + record = AdmissionRecord( + gpa_normalized=3.9, bg_tier=1, intern_score=8.0, + has_paper=True, has_research=True, + ) + result = predict_outcome(record, threshold) + assert result == "accepted" + + def test_weak_applicant_rejected(self): + threshold = ProgramThreshold( + program_id="baruch-mfe", + gpa_target=3.85, + max_bg_tier_accepted=2, + observed_acceptance_rate=0.10, + ) + record = AdmissionRecord( + gpa_normalized=3.2, bg_tier=5, intern_score=0.0, + has_paper=False, has_research=False, + ) + result = predict_outcome(record, threshold) + assert result == "rejected" + + +# =================================================================== +# generate_ranker_overrides +# =================================================================== + + +class TestGenerateRankerOverrides: + """Tests for generate_ranker_overrides().""" + + def test_generates_overrides(self): + records = _make_mixed_records() + result = calibrate_all(records) + overrides = generate_ranker_overrides(result) + + # Should have entries for programs with enough data + assert isinstance(overrides, dict) + for pid, ov in overrides.items(): + assert "reach_gpa_threshold" in ov + assert "safety_gpa_threshold" in ov + assert "observed_acceptance_rate" in ov + assert "confidence" in ov + + def test_override_values_reasonable(self): + records = _make_mixed_records() + result = calibrate_all(records) + overrides = generate_ranker_overrides(result) + + for pid, ov in overrides.items(): + assert 0 <= ov["reach_gpa_threshold"] <= 4.0 + assert 0 <= ov["safety_gpa_threshold"] <= 4.0 + assert 0 <= ov["observed_acceptance_rate"] <= 1.0