From 545f981daccea0a25eeea2264cf80e650eacb313 Mon Sep 17 00:00:00 2001 From: Don Boyd Date: Tue, 24 Mar 2026 11:46:06 -0400 Subject: [PATCH 01/10] Improve area weight solver: range-based feasibility, per-constraint penalties, LP pre-check Three robustness enhancements for the area weight QP solver: - _drop_impossible_targets() checks achievable range within multiplier bounds, not just all-zeros rows - _check_feasibility() LP pre-check (scipy HiGHS) before QP identifies constraints needing slack - _assign_slack_penalties() gives reduced penalty to inherently noisy targets in low-AGI bins Also adds solver_overrides.py for YAML-based per-area parameter management. Co-Authored-By: Claude Opus 4.6 (1M context) --- tmd/areas/batch_weights.py | 117 +++++++++++++- tmd/areas/create_area_weights.py | 270 +++++++++++++++++++++++++++++-- tmd/areas/solver_overrides.py | 135 ++++++++++++++++ 3 files changed, 501 insertions(+), 21 deletions(-) create mode 100644 tmd/areas/solver_overrides.py diff --git a/tmd/areas/batch_weights.py b/tmd/areas/batch_weights.py index 9a5c1f45..71a60aef 100644 --- a/tmd/areas/batch_weights.py +++ b/tmd/areas/batch_weights.py @@ -22,22 +22,37 @@ import numpy as np import pandas as pd import yaml +from scipy.sparse import csc_matrix # Module-level cache for TMD data (one per worker process) _WORKER_VDF = None _WORKER_POP = None _WORKER_TARGET_DIR = None _WORKER_WEIGHT_DIR = None +_WORKER_MULTIPLIER_MAX = None +_WORKER_OVERRIDES = None -def _init_worker(target_dir=None, weight_dir=None): +def _init_worker( + target_dir=None, + weight_dir=None, + multiplier_max=None, + override_path=None, +): """Load TMD data once per worker process.""" global _WORKER_VDF, _WORKER_POP global _WORKER_TARGET_DIR, _WORKER_WEIGHT_DIR + global _WORKER_MULTIPLIER_MAX, _WORKER_OVERRIDES if target_dir is not None: _WORKER_TARGET_DIR = Path(target_dir) if weight_dir is not None: _WORKER_WEIGHT_DIR = Path(weight_dir) + if multiplier_max is not None: + _WORKER_MULTIPLIER_MAX = multiplier_max + if override_path is not None: + from tmd.areas.solver_overrides import load_overrides + + _WORKER_OVERRIDES = load_overrides(Path(override_path)) if _WORKER_VDF is not None: return from tmd.areas.create_area_weights import ( @@ -65,7 +80,9 @@ def _solve_one_area(area): AREA_SLACK_PENALTY, FIRST_YEAR, LAST_YEAR, + _assign_slack_penalties, _build_constraint_matrix, + _check_feasibility, _drop_impossible_targets, _print_multiplier_diagnostics, _print_slack_diagnostics, @@ -82,6 +99,29 @@ def _solve_one_area(area): tgt_dir = _WORKER_TARGET_DIR wgt_dir = _WORKER_WEIGHT_DIR params = _read_params(area, out, target_dir=tgt_dir) + + # Apply solver overrides (centralized file takes precedence) + if _WORKER_OVERRIDES is not None: + from tmd.areas.solver_overrides import ( + get_area_overrides, + get_drop_targets, + ) + + area_ovr = get_area_overrides(_WORKER_OVERRIDES, area) + # Override params with centralized overrides + for key in ( + "constraint_tol", + "slack_penalty", + "max_iter", + "multiplier_min", + "multiplier_max", + ): + if key in area_ovr: + params[key] = area_ovr[key] + drop_patterns = get_drop_targets(_WORKER_OVERRIDES, area) + else: + drop_patterns = [] + constraint_tol = params.get( "constraint_tol", params.get("target_ratio_tolerance", AREA_CONSTRAINT_TOL), @@ -89,17 +129,67 @@ def _solve_one_area(area): slack_penalty = params.get("slack_penalty", AREA_SLACK_PENALTY) max_iter = params.get("max_iter", AREA_MAX_ITER) multiplier_min = params.get("multiplier_min", AREA_MULTIPLIER_MIN) - multiplier_max = params.get("multiplier_max", AREA_MULTIPLIER_MAX) + default_max = ( + _WORKER_MULTIPLIER_MAX + if _WORKER_MULTIPLIER_MAX is not None + else AREA_MULTIPLIER_MAX + ) + multiplier_max = params.get("multiplier_max", default_max) B_csc, targets, labels, pop_share = _build_constraint_matrix( area, vdf, out, target_dir=tgt_dir ) B_csc, targets, labels = _drop_impossible_targets( - B_csc, targets, labels, out + B_csc, + targets, + labels, + out, + multiplier_max=multiplier_max, + constraint_tol=constraint_tol, ) + # Apply per-area target drops from override file + if drop_patterns: + from tmd.areas.solver_overrides import should_drop_target + + keep = [not should_drop_target(lbl, drop_patterns) for lbl in labels] + n_drop = sum(1 for k in keep if not k) + if n_drop > 0: + keep_arr = np.array(keep) + B_dense = B_csc.toarray() + B_csc = csc_matrix(B_dense[keep_arr, :]) + targets = targets[keep_arr] + labels = [lbl2 for lbl2, k in zip(labels, keep) if k] + out.write( + f"OVERRIDE: dropped {n_drop} targets," + f" {len(targets)} remaining\n" + ) + n_records = B_csc.shape[1] + # Pre-solve feasibility check + _check_feasibility( + B_csc, + targets, + labels, + n_records, + constraint_tol=constraint_tol, + multiplier_min=multiplier_min, + multiplier_max=multiplier_max, + out=out, + ) + + # Assign per-constraint slack penalties + per_constraint_penalties = _assign_slack_penalties( + labels, default_penalty=slack_penalty + ) + n_reduced = int((per_constraint_penalties < slack_penalty).sum()) + if n_reduced > 0: + out.write( + f"SLACK PENALTIES: {n_reduced}/{len(labels)}" + f" constraints have reduced penalty\n" + ) + # Check for per-record multiplier caps (from exhaustion limiting) caps_path = wgt_dir / f"{area}_record_caps.npy" if caps_path.exists(): @@ -117,7 +207,7 @@ def _solve_one_area(area): labels, n_records, constraint_tol=constraint_tol, - slack_penalty=slack_penalty, + slack_penalties=per_constraint_penalties, max_iter=max_iter, multiplier_min=multiplier_min, multiplier_max=multiplier_max, @@ -207,6 +297,8 @@ def run_batch( force=False, target_dir=None, weight_dir=None, + multiplier_max=None, + override_path=None, ): """ Run area weight optimization for multiple areas in parallel. @@ -272,9 +364,11 @@ def run_batch( - 1 ) # subtract header n = len(areas) + max_total = n * n_targets print( f"Processing {n} areas" - f" (up to {n_targets} targets each)" + f" (up to {n_targets} targets each," + f" {max_total:,} max total)" f" with {num_workers} workers..." ) print( @@ -294,7 +388,12 @@ def run_batch( with ProcessPoolExecutor( max_workers=num_workers, initializer=_init_worker, - initargs=(str(target_dir), str(weight_dir)), + initargs=( + str(target_dir), + str(weight_dir), + multiplier_max, + str(override_path) if override_path else None, + ), ) as executor: futures = { executor.submit(_solve_one_area, area): area for area in areas @@ -328,7 +427,11 @@ def run_batch( sys.stdout.write("\n") total = time.time() - t_start - print(f"\nCompleted {completed}/{n} areas in {total:.1f}s") + m, s = divmod(total, 60) + print( + f"\nCompleted {completed}/{n} areas in" + f" {total:.1f}s ({int(m)}m {s:.0f}s)" + ) if violated_areas: total_viol = sum(v for _, v in violated_areas) print( diff --git a/tmd/areas/create_area_weights.py b/tmd/areas/create_area_weights.py index f50cb35e..59ec94c1 100644 --- a/tmd/areas/create_area_weights.py +++ b/tmd/areas/create_area_weights.py @@ -28,10 +28,13 @@ import sys import time +import re + import clarabel import numpy as np import pandas as pd import yaml +from scipy.optimize import linprog from scipy.sparse import ( csc_matrix, diags as spdiags, @@ -65,13 +68,17 @@ # Default solver parameters AREA_CONSTRAINT_TOL = 0.005 AREA_SLACK_PENALTY = 1e6 +AREA_REDUCED_SLACK_PENALTY = 1e3 AREA_MAX_ITER = 2000 AREA_MULTIPLIER_MIN = 0.0 AREA_MULTIPLIER_MAX = 25.0 +CD_MULTIPLIER_MAX = 50.0 -# Default target/weight directories for states +# Default target/weight directories STATE_TARGET_DIR = AREAS_FOLDER / "targets" / "states" STATE_WEIGHT_DIR = AREAS_FOLDER / "weights" / "states" +CD_TARGET_DIR = AREAS_FOLDER / "targets" / "cds" +CD_WEIGHT_DIR = AREAS_FOLDER / "weights" / "cds" def _load_taxcalc_data(): @@ -229,28 +236,247 @@ def _build_constraint_matrix(area, vardf, out, target_dir=None): return B_csc, targets_arr, labels_list, pop_share -def _drop_impossible_targets(B_csc, targets, labels, out): - """Drop targets where all constraint matrix values are zero.""" - col_sums = np.abs(np.asarray(B_csc.sum(axis=1))).ravel() - all_zero = col_sums == 0 - if all_zero.any(): - n_drop = int(all_zero.sum()) - for i in np.where(all_zero)[0]: +def _drop_impossible_targets( + B_csc, + targets, + labels, + out, + multiplier_min=AREA_MULTIPLIER_MIN, + multiplier_max=AREA_MULTIPLIER_MAX, + constraint_tol=AREA_CONSTRAINT_TOL, +): + """ + Drop targets that cannot be reached within tolerance. + + A target is impossible if: + - All B matrix values are zero (no records contribute), or + - The target is outside the achievable range even with all + multipliers at their extreme bounds. + """ + B_dense = B_csc.toarray() + m = len(targets) + drop = np.zeros(m, dtype=bool) + + for j in range(m): + row = B_dense[j, :] + pos = row > 0 + neg = row < 0 + + # All zeros + if not pos.any() and not neg.any(): + out.write( + f"DROPPING impossible target" f" (all zeros): {labels[j]}\n" + ) + drop[j] = True + continue + + # Compute achievable range + max_val = ( + multiplier_max * row[pos].sum() + multiplier_min * row[neg].sum() + ) + min_val = ( + multiplier_min * row[pos].sum() + multiplier_max * row[neg].sum() + ) + + tgt = targets[j] + tol = abs(tgt) * constraint_tol + if max_val < tgt - tol or min_val > tgt + tol: out.write( - f"DROPPING impossible target" f" (all zeros): {labels[i]}\n" + f"DROPPING unreachable target: {labels[j]}" + f" (target={tgt:.0f}," + f" achievable=[{min_val:.0f}, {max_val:.0f}])\n" ) - keep = ~all_zero - B_dense = B_csc.toarray() + drop[j] = True + + n_drop = int(drop.sum()) + if n_drop > 0: + keep = ~drop B_csc = csc_matrix(B_dense[keep, :]) targets = targets[keep] labels = [lab for lab, k in zip(labels, keep) if k] out.write( - f"Dropped {n_drop} impossible targets," + f"Dropped {n_drop} impossible/unreachable targets," f" {len(targets)} remaining\n" ) return B_csc, targets, labels +def _check_feasibility( + B_csc, + targets, + labels, + n_records, + constraint_tol=AREA_CONSTRAINT_TOL, + multiplier_min=AREA_MULTIPLIER_MIN, + multiplier_max=AREA_MULTIPLIER_MAX, + out=None, +): + """ + Fast LP feasibility check before running the full QP. + + Solves: minimize 0 subject to + cl <= Bx + s_lo - s_hi <= cu + multiplier_min <= x <= multiplier_max + s_lo, s_hi >= 0 + + If infeasible, identifies which constraints contribute most + to the infeasibility by solving a relaxed LP that minimizes + total slack. + + Returns + ------- + dict with keys: + feasible : bool + slack_needed : np.ndarray or None + Per-constraint slack needed (0 if feasible). + worst_labels : list of (label, slack_amount) + Top constraints requiring the most slack. + """ + if out is None: + out = sys.stdout + + m = len(targets) + abs_targets = np.abs(targets) + tol_band = abs_targets * constraint_tol + cl = targets - tol_band + cu = targets + tol_band + + B_dense = B_csc.toarray() + + # LP: minimize sum(s_lo + s_hi) + # Variables: [x (n_records), s_lo (m), s_hi (m)] + n_total = n_records + 2 * m + + # Objective: minimize sum of slacks + c_obj = np.zeros(n_total) + c_obj[n_records:] = 1.0 # minimize total slack + + # Constraints: cl <= Bx + s_lo - s_hi <= cu + # Rewrite as: + # Bx + s_lo - s_hi >= cl => -Bx - s_lo + s_hi <= -cl + # Bx + s_lo - s_hi <= cu + I_m = np.eye(m) + # [B | I | -I] x <= cu + A_upper = np.hstack([B_dense, I_m, -I_m]) + # [-B | -I | I] x <= -cl + A_lower = np.hstack([-B_dense, -I_m, I_m]) + A_ub = np.vstack([A_upper, A_lower]) + b_ub = np.concatenate([cu, -cl]) + + # Bounds + bounds = [] + for i in range(n_records): + lb = multiplier_min + ub = multiplier_max + if isinstance(multiplier_max, np.ndarray): + ub = multiplier_max[i] + bounds.append((lb, ub)) + for _ in range(2 * m): + bounds.append((0.0, None)) + + result = linprog( + c_obj, + A_ub=A_ub, + b_ub=b_ub, + bounds=bounds, + method="highs", + options={"presolve": True, "time_limit": 30}, + ) + + info = {"feasible": False, "slack_needed": None, "worst_labels": []} + + if result.success: + s_lo = result.x[n_records : n_records + m] + s_hi = result.x[n_records + m :] + total_slack = s_lo + s_hi + info["feasible"] = total_slack.max() < 1e-6 + info["slack_needed"] = total_slack + + if not info["feasible"]: + # Report constraints needing the most slack + worst_idx = np.argsort(total_slack)[::-1] + worst = [] + for idx in worst_idx[:10]: + if total_slack[idx] > 1e-6: + rel = total_slack[idx] / max(abs(targets[idx]), 1) + worst.append((labels[idx], total_slack[idx], rel)) + info["worst_labels"] = worst + + out.write( + f"PRE-SOLVE FEASIBILITY: INFEASIBLE" + f" — {len(worst)} constraints need slack\n" + ) + for lbl, slk, rel in worst[:5]: + out.write( + f" {lbl}: slack={slk:.0f}" f" ({rel:.1%} of target)\n" + ) + else: + out.write("PRE-SOLVE FEASIBILITY: OK\n") + else: + out.write( + f"PRE-SOLVE FEASIBILITY: LP solver failed" f" ({result.message})\n" + ) + + return info + + +def _assign_slack_penalties( + labels, + default_penalty=AREA_SLACK_PENALTY, + reduced_penalty=AREA_REDUCED_SLACK_PENALTY, +): + """ + Assign per-constraint slack penalties based on label content. + + Most constraints get the default (high) penalty. Constraints + involving variables and AGI bins where SOI-to-TMD mapping is + inherently noisy get a reduced penalty — the solver will still + try to hit them but will relax them before distorting weights. + + Reduced-penalty targets: + - e02400, e00300, e26270 amounts in AGI stubs 1-3 (<$25K). + These income types are concentrated among high-income or + elderly filers; low-AGI bin targets are noisy and can + conflict with other constraints in extreme CDs. + - Filing-status counts (fs=1,2,4) in stubs 1-2 (<$10K). + Very small cells in the lowest income bins. + + Returns + ------- + np.ndarray + Penalty value for each constraint, same length as labels. + """ + penalties = np.full(len(labels), default_penalty) + reduced_vars = {"e02400", "e00300", "e26270"} + + for i, label in enumerate(labels): + parts = label.split("/") + varname = parts[0] + attrs = {} + for p in parts[1:]: + if "=" in p: + k, v = p.split("=", 1) + attrs[k] = v + + cnt = int(attrs.get("cnt", -1)) + fs = int(attrs.get("fs", 0)) + agi_raw = attrs.get("agi", "") + + # Parse AGI upper bound from "[lo,hi)" + agihi = 9e99 + m = re.match(r"\[([^,]+),([^)]+)\)", agi_raw) + if m: + agihi = float(m.group(2)) + + # Reduced penalty for problematic variable-bin combos + if varname in reduced_vars and cnt == 0 and agihi <= 25000: + penalties[i] = reduced_penalty + elif cnt == 1 and fs > 0 and agihi <= 10000: + penalties[i] = reduced_penalty + + return penalties + + def _solve_area_qp( # pylint: disable=unused-argument B_csc, targets, @@ -258,6 +484,7 @@ def _solve_area_qp( # pylint: disable=unused-argument n_records, constraint_tol=AREA_CONSTRAINT_TOL, slack_penalty=AREA_SLACK_PENALTY, + slack_penalties=None, max_iter=AREA_MAX_ITER, multiplier_min=AREA_MULTIPLIER_MIN, multiplier_max=AREA_MULTIPLIER_MAX, @@ -269,6 +496,12 @@ def _solve_area_qp( # pylint: disable=unused-argument Parameters ---------- + slack_penalty : float + Default penalty for constraint slack (used when + slack_penalties is None). + slack_penalties : np.ndarray, optional + Per-constraint slack penalties (length m). Overrides + slack_penalty when provided. weight_penalty : float Penalty weight on (x-1)^2 relative to constraint slack. Higher values keep multipliers closer to 1.0 @@ -289,9 +522,14 @@ def _solve_area_qp( # pylint: disable=unused-argument cu = targets + tol_band # diagonal Hessian: 2*alpha for x, 2*M for slacks + # Each slack pair (s_lo, s_hi) gets the same penalty hess_diag = np.empty(n_total) hess_diag[:n_records] = 2.0 * weight_penalty - hess_diag[n_records:] = 2.0 * slack_penalty + if slack_penalties is not None: + hess_diag[n_records : n_records + m] = 2.0 * slack_penalties + hess_diag[n_records + m :] = 2.0 * slack_penalties + else: + hess_diag[n_records:] = 2.0 * slack_penalty P = spdiags(hess_diag, format="csc") # linear term: -2*alpha for x @@ -366,6 +604,7 @@ def _solve_area_qp( # pylint: disable=unused-argument "iterations": result.iterations, "solve_time": elapsed, "clarabel_solve_time": result.solve_time, + "dual": np.array(result.z) if result.z is not None else None, } return x_opt, s_lo, s_hi, info @@ -556,13 +795,16 @@ def create_area_weights_file( out.write(f" max |relative error|: {rel_err_x1.max():.6f}\n") # solve QP + per_constraint_penalties = _assign_slack_penalties( + labels, default_penalty=slack_penalty + ) x_opt, s_lo, s_hi, _info = _solve_area_qp( B_csc, targets, labels, n_records, constraint_tol=constraint_tol, - slack_penalty=slack_penalty, + slack_penalties=per_constraint_penalties, max_iter=max_iter, multiplier_min=multiplier_min, multiplier_max=multiplier_max, diff --git a/tmd/areas/solver_overrides.py b/tmd/areas/solver_overrides.py new file mode 100644 index 00000000..502c5b4c --- /dev/null +++ b/tmd/areas/solver_overrides.py @@ -0,0 +1,135 @@ +""" +Per-area solver override management. + +Provides a centralized mechanism for storing and loading per-area +solver parameter overrides. The override file is a YAML file with +a ``_defaults`` section and per-area entries. + +Developer mode generates this file by running iterative LP/QP +feasibility checks and applying a relaxation cascade. Production +mode reads it to configure each area's solve in a single pass. + +File format:: + + # Solver overrides for CDs + _defaults: + multiplier_max: 50 + constraint_tol: 0.005 + + # Only areas needing special treatment appear below + fl28: + drop_targets: + - "e26270/cnt=0/agi=[10000.0,25000.0)" + ny12: + constraint_tol: 0.01 + drop_targets: + - "e00300/cnt=0/agi=[-9e+99,1.0)" +""" + +from pathlib import Path +from typing import Dict, List + +import yaml + + +def load_overrides(override_path: Path) -> dict: + """ + Load solver overrides from a YAML file. + + Returns a dict with ``_defaults`` and per-area entries. + If the file doesn't exist, returns empty defaults. + """ + if not override_path.exists(): + return {"_defaults": {}} + with open(override_path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f.read()) + if data is None: + data = {} + if "_defaults" not in data: + data["_defaults"] = {} + return data + + +def get_area_overrides( + overrides: dict, + area: str, +) -> dict: + """ + Get merged overrides for a specific area. + + Returns defaults updated with any area-specific overrides. + Per-area values take precedence over defaults. + """ + defaults = dict(overrides.get("_defaults", {})) + area_specific = overrides.get(area, {}) + if area_specific: + defaults.update(area_specific) + return defaults + + +def get_drop_targets(overrides: dict, area: str) -> List[str]: + """ + Get list of target labels to drop for an area. + + Returns a list of constraint label patterns to exclude + from the QP. Empty list if no drops needed. + """ + area_ovr = get_area_overrides(overrides, area) + return area_ovr.get("drop_targets", []) + + +def should_drop_target( + label: str, + drop_patterns: List[str], +) -> bool: + """ + Check if a target label matches any drop pattern. + + Patterns are matched as substrings of the full label. + For example, "e26270/cnt=0/agi=[10000.0,25000.0)" matches + any label containing that string. + """ + for pattern in drop_patterns: + if pattern in label: + return True + return False + + +def write_overrides( + override_path: Path, + defaults: dict, + area_overrides: Dict[str, dict], +): + """ + Write solver overrides to a YAML file. + + Parameters + ---------- + override_path : Path + Output file path. + defaults : dict + Default parameters for all areas. + area_overrides : dict + Per-area overrides (only areas needing special treatment). + """ + data = {"_defaults": defaults} + for area in sorted(area_overrides.keys()): + ovr = area_overrides[area] + if ovr: # skip empty overrides + data[area] = ovr + + override_path.parent.mkdir(parents=True, exist_ok=True) + with open(override_path, "w", encoding="utf-8") as f: + # Write header comment + f.write("# Solver overrides — auto-generated by developer mode\n") + f.write( + "# Edit manually only if automatic relaxation is insufficient\n" + ) + f.write("\n") + yaml.dump( + data, + f, + default_flow_style=False, + sort_keys=False, + allow_unicode=True, + ) From 4ce5e4eb92a230b5baba96a7f8fecdfb1ac25dd4 Mon Sep 17 00:00:00 2001 From: Don Boyd Date: Tue, 24 Mar 2026 11:56:13 -0400 Subject: [PATCH 02/10] Add post-solve state weight tests, remove stale "Pass 1" label - test_state_weight_results.py: validates weight file existence, nonnegativity, no NaN/inf, solver status, and target accuracy for 5 representative states (AL, CA, MN, NY, TX) - Tests skip gracefully if weight files not yet generated - Target accuracy allows 0.05% margin above solver tolerance for floating-point differences in weight-file roundtrip - Remove "Pass 1:" label from solve_weights print (only one pass) Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/test_state_weight_results.py | 163 +++++++++++++++++++++++++++++ tmd/areas/solve_weights.py | 3 +- 2 files changed, 164 insertions(+), 2 deletions(-) create mode 100644 tests/test_state_weight_results.py diff --git a/tests/test_state_weight_results.py b/tests/test_state_weight_results.py new file mode 100644 index 00000000..1ade71c4 --- /dev/null +++ b/tests/test_state_weight_results.py @@ -0,0 +1,163 @@ +""" +Post-solve validation of state weight files. + +These tests verify that the actual state weight outputs are valid. +They are skipped if weight files have not been generated yet +(i.e., solve_weights --scope states has not been run). + +Run after: + python -m tmd.areas.prepare_targets --scope states + python -m tmd.areas.solve_weights --scope states --workers 8 +""" + +import io +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest + +from tmd.areas.create_area_weights import ( + AREA_CONSTRAINT_TOL, + STATE_TARGET_DIR, + STATE_WEIGHT_DIR, + _build_constraint_matrix, + _drop_impossible_targets, + _load_taxcalc_data, +) +from tmd.areas.prepare.constants import ALL_STATES +from tmd.imputation_assumptions import TAXYEAR + +# Skip entire module if weight files haven't been generated +_WEIGHT_FILES = list(STATE_WEIGHT_DIR.glob("*_tmd_weights.csv.gz")) +pytestmark = pytest.mark.skipif( + len(_WEIGHT_FILES) < 51, + reason="State weight files not generated yet", +) + +# Also need cached data files for target accuracy checks +_CACHED = Path(__file__).parent.parent / "tmd" / "storage" / "output" +_HAS_CACHED = (_CACHED / "tmd.csv.gz").exists() and ( + _CACHED / "cached_c00100.npy" +).exists() + + +class TestStateWeightFiles: + """Basic validity checks on all 51 state weight files.""" + + def test_all_states_have_weight_files(self): + """Every state has a weight file.""" + for st in ALL_STATES: + wpath = STATE_WEIGHT_DIR / f"{st.lower()}_tmd_weights.csv.gz" + assert wpath.exists(), f"Missing weight file for {st}" + + def test_all_states_have_log_files(self): + """Every state has a solver log.""" + for st in ALL_STATES: + logpath = STATE_WEIGHT_DIR / f"{st.lower()}.log" + assert logpath.exists(), f"Missing log file for {st}" + + def test_weight_columns(self): + """Weight files have expected year columns.""" + wpath = STATE_WEIGHT_DIR / "mn_tmd_weights.csv.gz" + wdf = pd.read_csv(wpath) + expected = [f"WT{yr}" for yr in range(TAXYEAR, 2035)] + assert list(wdf.columns) == expected + + def test_weight_row_count(self): + """Weight files have one row per TMD record.""" + wpath = STATE_WEIGHT_DIR / "mn_tmd_weights.csv.gz" + wdf = pd.read_csv(wpath) + # Should match TMD record count (215,494 for 2022) + assert len(wdf) > 200_000 + + @pytest.mark.parametrize( + "state", + [s.lower() for s in ALL_STATES], + ) + def test_weights_nonnegative(self, state): + """All weights are non-negative.""" + wpath = STATE_WEIGHT_DIR / f"{state}_tmd_weights.csv.gz" + wdf = pd.read_csv(wpath) + assert (wdf >= 0).all().all(), f"{state}: negative weights found" + + @pytest.mark.parametrize( + "state", + [s.lower() for s in ALL_STATES], + ) + def test_weights_no_nan(self, state): + """No NaN or inf values in weights.""" + wpath = STATE_WEIGHT_DIR / f"{state}_tmd_weights.csv.gz" + wdf = pd.read_csv(wpath) + assert not wdf.isna().any().any(), f"{state}: NaN values found" + assert np.isfinite(wdf.values).all(), f"{state}: inf values found" + + @pytest.mark.parametrize( + "state", + [s.lower() for s in ALL_STATES], + ) + def test_solver_status_solved(self, state): + """Solver log reports Solved status.""" + logpath = STATE_WEIGHT_DIR / f"{state}.log" + log_text = logpath.read_text() + assert ( + "Solver status: Solved" in log_text + ), f"{state}: solver did not report Solved" + + +@pytest.mark.skipif( + not _HAS_CACHED, + reason="Cached TMD data files not available", +) +class TestStateTargetAccuracy: + """Verify weighted sums hit targets within tolerance.""" + + @pytest.fixture(scope="class") + def vdf(self): + """Load TMD data once for all accuracy tests.""" + return _load_taxcalc_data() + + @pytest.mark.parametrize( + "state", + ["al", "ca", "mn", "ny", "tx"], + ) + def test_targets_hit(self, vdf, state): + """Weighted sums match targets within constraint tolerance.""" + out = io.StringIO() + B_csc, targets, labels, pop_share = _build_constraint_matrix( + state, + vdf, + out, + target_dir=STATE_TARGET_DIR, + ) + B_csc, targets, labels = _drop_impossible_targets( + B_csc, + targets, + labels, + out, + ) + + # Load weights and compute multipliers + wpath = STATE_WEIGHT_DIR / f"{state}_tmd_weights.csv.gz" + wdf = pd.read_csv(wpath) + area_weights = wdf[f"WT{TAXYEAR}"].values + w0 = pop_share * vdf["s006"].values + # Avoid division by zero for zero-weight records + safe_w0 = np.where(w0 > 0, w0, 1.0) + x = area_weights / safe_w0 + x = np.where(w0 > 0, x, 0.0) + + # Check target accuracy + achieved = np.asarray(B_csc @ x).ravel() + rel_errors = np.abs(achieved - targets) / np.maximum( + np.abs(targets), 1.0 + ) + # Allow small margin above solver tolerance for floating-point + # differences between solver internals and weight-file roundtrip + eps = 1e-4 + n_violated = int((rel_errors > AREA_CONSTRAINT_TOL + eps).sum()) + max_err = rel_errors.max() + assert n_violated == 0, ( + f"{state}: {n_violated} targets violated, " + f"max error = {max_err * 100:.3f}%" + ) diff --git a/tmd/areas/solve_weights.py b/tmd/areas/solve_weights.py index ce8af3b0..2f085368 100644 --- a/tmd/areas/solve_weights.py +++ b/tmd/areas/solve_weights.py @@ -71,8 +71,7 @@ def solve_state_weights( t0 = time.time() - # --- Pass 1: unconstrained solve --- - print("Pass 1: solving state weights...") + print("Solving state weights...") run_batch( num_workers=num_workers, area_filter=area_filter, From af35219429bc48fd9163fca49fe78898c9db7dd4 Mon Sep 17 00:00:00 2001 From: Don Boyd Date: Tue, 24 Mar 2026 12:08:12 -0400 Subject: [PATCH 03/10] Fix quality report --scope states CLI parsing The --scope states argument was being split into ["STATES"] instead of being treated as the default all-states scope. Now recognized as a keyword that maps to None (all states). Co-Authored-By: Claude Opus 4.6 (1M context) --- tmd/areas/quality_report.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tmd/areas/quality_report.py b/tmd/areas/quality_report.py index 1531cb2e..b18837df 100644 --- a/tmd/areas/quality_report.py +++ b/tmd/areas/quality_report.py @@ -760,7 +760,7 @@ def main(): args = parser.parse_args() areas = None - if args.scope: + if args.scope and args.scope.lower() != "states": areas = [s.strip().upper() for s in args.scope.split(",")] report = generate_report(areas) From cc666af16c141d7a7ec9a541e5aba2c659bd4d84 Mon Sep 17 00:00:00 2001 From: Don Boyd Date: Wed, 25 Mar 2026 05:39:46 -0400 Subject: [PATCH 04/10] Reduce solver memory: sparse-only matrix construction and LP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Build constraint matrix B directly in sparse COO format instead of constructing dense A (310 MB) and B (310 MB) intermediates. Use sparse row iteration in _drop_impossible_targets() and sparse LP construction in _check_feasibility(). Peak memory per worker: 1,244 MB (master) → 798 MB (this commit). With 16 workers: ~20 GB → ~13 GB, preventing OOM on WSL2. Co-Authored-By: Claude Opus 4.6 (1M context) --- tmd/areas/create_area_weights.py | 126 ++++++++++++++++++++----------- 1 file changed, 82 insertions(+), 44 deletions(-) diff --git a/tmd/areas/create_area_weights.py b/tmd/areas/create_area_weights.py index 59ec94c1..1eb53590 100644 --- a/tmd/areas/create_area_weights.py +++ b/tmd/areas/create_area_weights.py @@ -36,7 +36,7 @@ import yaml from scipy.optimize import linprog from scipy.sparse import ( - csc_matrix, + coo_matrix, diags as spdiags, eye as speye, hstack, @@ -152,10 +152,29 @@ def _build_constraint_matrix(area, vardf, out, target_dir=None): targets_file = target_dir / f"{area}_targets.csv" tdf = pd.read_csv(targets_file, comment="#") + # Pre-cache column arrays to avoid repeated DataFrame lookups + c00100_arr = vardf["c00100"].values.astype(float) + mars_arr = vardf["MARS"].values.astype(float) + data_source_arr = vardf["data_source"].values + scope1_mask = (data_source_arr == 1).astype(float) + scope2_mask = (data_source_arr == 0).astype(float) + ones_arr = np.ones(numobs, dtype=float) + + unique_vars = tdf.loc[tdf["count"] != 1, "varname"].unique() + var_cache = {} + for vname in unique_vars: + var_cache[vname] = vardf[vname].astype(float).values + targets_list = [] labels_list = [] - columns = [] pop_share = None + w0 = None + + # Build B matrix directly in sparse COO format, avoiding dense + # intermediates that would require ~620 MB for 215K × 179. + row_indices = [] + col_indices = [] + data_values = [] for row_idx, row in enumerate(tdf.itertuples(index=False)): line = f"{area}:target{row_idx + 1}" @@ -186,51 +205,58 @@ def _build_constraint_matrix(area, vardf, out, target_dir=None): f" / {national_population:.0f}" f" = {pop_share:.6f}\n" ) + w0 = pop_share * vardf.s006.values - # construct variable array for this target + # construct variable array from cache assert 0 <= row.count <= 4, f"count {row.count} not in [0,4] on {line}" if row.count == 0: - var_array = vardf[row.varname].astype(float).values + var_array = var_cache[row.varname] elif row.count == 1: - var_array = np.ones(numobs, dtype=float) + var_array = ones_arr elif row.count == 2: - var_array = (vardf[row.varname] != 0).astype(float).values + var_array = (var_cache[row.varname] != 0).astype(float) elif row.count == 3: - var_array = (vardf[row.varname] > 0).astype(float).values + var_array = (var_cache[row.varname] > 0).astype(float) else: - var_array = (vardf[row.varname] < 0).astype(float).values + var_array = (var_cache[row.varname] < 0).astype(float) - # construct mask - mask = np.ones(numobs, dtype=float) + # construct mask using cached arrays assert 0 <= row.scope <= 2, f"scope {row.scope} not in [0,2] on {line}" if row.scope == 1: - mask *= (vardf.data_source == 1).values.astype(float) + mask = scope1_mask.copy() elif row.scope == 2: - mask *= (vardf.data_source == 0).values.astype(float) + mask = scope2_mask.copy() + else: + mask = ones_arr.copy() - in_agi_bin = (vardf.c00100 >= row.agilo) & (vardf.c00100 < row.agihi) - mask *= in_agi_bin.values.astype(float) + mask *= (c00100_arr >= row.agilo) & (c00100_arr < row.agihi) assert ( 0 <= row.fstatus <= 5 ), f"fstatus {row.fstatus} not in [0,5] on {line}" if row.fstatus > 0: - mask *= (vardf.MARS == row.fstatus).values.astype(float) + mask *= mars_arr == row.fstatus - # A[j,i] = mask * var_array (data values for this constraint) - columns.append(mask * var_array) + # B[j,i] = w0[i] * mask[i] * var_array[i] + # Store only nonzero entries (sparse COO format) + b_row = w0 * mask * var_array + nz = np.nonzero(b_row)[0] + if len(nz) > 0: + row_indices.append(np.full(len(nz), row_idx, dtype=np.int32)) + col_indices.append(nz.astype(np.int32)) + data_values.append(b_row[nz]) assert pop_share is not None, "XTOT target not found" - # A matrix: n_records x n_targets (each column is a constraint) - A_dense = np.column_stack(columns) - - # B = diag(w0) @ A where w0 = pop_share * s006 - w0 = pop_share * vardf.s006.values - B_dense = w0[:, np.newaxis] * A_dense - - # convert to sparse (n_targets x n_records) for Clarabel - B_csc = csc_matrix(B_dense.T) + # Assemble sparse B directly from COO — no dense intermediate + n_targets = len(targets_list) + B_csc = coo_matrix( + ( + np.concatenate(data_values), + (np.concatenate(row_indices), np.concatenate(col_indices)), + ), + shape=(n_targets, numobs), + ).tocsc() targets_arr = np.array(targets_list) return B_csc, targets_arr, labels_list, pop_share @@ -253,12 +279,12 @@ def _drop_impossible_targets( - The target is outside the achievable range even with all multipliers at their extreme bounds. """ - B_dense = B_csc.toarray() m = len(targets) drop = np.zeros(m, dtype=bool) for j in range(m): - row = B_dense[j, :] + # Extract one sparse row at a time — avoids 310 MB dense copy + row = B_csc.getrow(j).toarray().ravel() pos = row > 0 neg = row < 0 @@ -291,7 +317,9 @@ def _drop_impossible_targets( n_drop = int(drop.sum()) if n_drop > 0: keep = ~drop - B_csc = csc_matrix(B_dense[keep, :]) + # Slice sparse matrix directly — no dense conversion + keep_idx = np.where(keep)[0] + B_csc = B_csc[keep_idx, :] targets = targets[keep] labels = [lab for lab, k in zip(labels, keep) if k] out.write( @@ -341,8 +369,6 @@ def _check_feasibility( cl = targets - tol_band cu = targets + tol_band - B_dense = B_csc.toarray() - # LP: minimize sum(s_lo + s_hi) # Variables: [x (n_records), s_lo (m), s_hi (m)] n_total = n_records + 2 * m @@ -355,24 +381,36 @@ def _check_feasibility( # Rewrite as: # Bx + s_lo - s_hi >= cl => -Bx - s_lo + s_hi <= -cl # Bx + s_lo - s_hi <= cu - I_m = np.eye(m) + # Build entirely in sparse format to avoid ~1.2 GB dense alloc + I_m = speye(m, format="csc") # [B | I | -I] x <= cu - A_upper = np.hstack([B_dense, I_m, -I_m]) + A_upper = hstack([B_csc, I_m, -I_m], format="csc") # [-B | -I | I] x <= -cl - A_lower = np.hstack([-B_dense, -I_m, I_m]) - A_ub = np.vstack([A_upper, A_lower]) + A_lower = hstack([-B_csc, -I_m, I_m], format="csc") + A_ub = vstack([A_upper, A_lower], format="csc") b_ub = np.concatenate([cu, -cl]) # Bounds - bounds = [] - for i in range(n_records): - lb = multiplier_min - ub = multiplier_max - if isinstance(multiplier_max, np.ndarray): - ub = multiplier_max[i] - bounds.append((lb, ub)) - for _ in range(2 * m): - bounds.append((0.0, None)) + if isinstance(multiplier_max, np.ndarray): + x_ub = multiplier_max + else: + x_ub = np.full(n_records, multiplier_max) + bounds = np.column_stack( + [ + np.concatenate( + [ + np.full(n_records, multiplier_min), + np.zeros(2 * m), + ] + ), + np.concatenate( + [ + x_ub, + np.full(2 * m, np.inf), + ] + ), + ] + ) result = linprog( c_obj, From 845e207d6651afab9a81cc9ad6c803eabd57e82a Mon Sep 17 00:00:00 2001 From: Don Boyd Date: Wed, 25 Mar 2026 05:48:12 -0400 Subject: [PATCH 05/10] Reduce memory: trim TMD columns, preload in parent process MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two complementary memory optimizations: 1. Column trimming in _load_taxcalc_data(): drop unused columns from TMD DataFrame (109 → ~30 columns), saving ~150 MB per worker. Uses pattern matching (e*, c*, p* prefixes) so new target variables are automatically retained. 2. Parent-process preloading in batch_weights.py: load TMD once before forking workers instead of once per worker. On Linux, fork shares memory pages copy-on-write, saving ~150 MB × (num_workers - 1). Combined with the previous sparse matrix commit, peak memory per worker drops from ~1.8 GB (PR1 before fixes) to ~0.8 GB. With 16 workers this is ~13 GB vs ~29 GB, preventing OOM on WSL2. Co-Authored-By: Claude Opus 4.6 (1M context) --- tmd/areas/batch_weights.py | 55 ++++++++++++++++++++++++-------- tmd/areas/create_area_weights.py | 19 ++++++++++- 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/tmd/areas/batch_weights.py b/tmd/areas/batch_weights.py index 71a60aef..09f7aff0 100644 --- a/tmd/areas/batch_weights.py +++ b/tmd/areas/batch_weights.py @@ -24,7 +24,16 @@ import yaml from scipy.sparse import csc_matrix -# Module-level cache for TMD data (one per worker process) +# Module-level globals shared across workers via fork copy-on-write. +# +# _WORKER_VDF and _WORKER_POP are loaded ONCE in the parent process +# (by _preload_data) before ProcessPoolExecutor forks workers. +# On Linux, forked children inherit these pages read-only; the OS +# shares the physical memory until a child writes to it. This saves +# ~190 MB × (num_workers - 1) compared to loading per-worker. +# +# The remaining globals are per-worker configuration set by +# _init_worker (called as the executor's initializer). _WORKER_VDF = None _WORKER_POP = None _WORKER_TARGET_DIR = None @@ -33,14 +42,37 @@ _WORKER_OVERRIDES = None +def _preload_data(): + """Load TMD data in the parent process before forking workers. + + On Linux (fork), child processes share the parent's memory + pages copy-on-write. Loading here instead of per-worker avoids + duplicating ~190 MB × N workers. + """ + global _WORKER_VDF, _WORKER_POP + if _WORKER_VDF is not None: + return + from tmd.areas.create_area_weights import ( + POPFILE_PATH, + _load_taxcalc_data, + ) + + _WORKER_VDF = _load_taxcalc_data() + with open(POPFILE_PATH, "r", encoding="utf-8") as pf: + _WORKER_POP = yaml.safe_load(pf.read()) + + def _init_worker( target_dir=None, weight_dir=None, multiplier_max=None, override_path=None, ): - """Load TMD data once per worker process.""" - global _WORKER_VDF, _WORKER_POP + """Initialize per-worker state (directories, overrides). + + TMD data is already loaded in the parent via _preload_data() + and shared copy-on-write through fork. + """ global _WORKER_TARGET_DIR, _WORKER_WEIGHT_DIR global _WORKER_MULTIPLIER_MAX, _WORKER_OVERRIDES if target_dir is not None: @@ -53,16 +85,9 @@ def _init_worker( from tmd.areas.solver_overrides import load_overrides _WORKER_OVERRIDES = load_overrides(Path(override_path)) - if _WORKER_VDF is not None: - return - from tmd.areas.create_area_weights import ( - POPFILE_PATH, - _load_taxcalc_data, - ) - - _WORKER_VDF = _load_taxcalc_data() - with open(POPFILE_PATH, "r", encoding="utf-8") as pf: - _WORKER_POP = yaml.safe_load(pf.read()) + # Fallback: load data if not preloaded (e.g. single-area solve) + if _WORKER_VDF is None: + _preload_data() def _solve_one_area(area): @@ -379,6 +404,10 @@ def run_batch( # Ensure weights directory exists weight_dir.mkdir(parents=True, exist_ok=True) + # Load TMD data in parent before forking — workers share via + # copy-on-write, saving ~190 MB × (num_workers - 1). + _preload_data() + t_start = time.time() completed = 0 violated_areas = [] diff --git a/tmd/areas/create_area_weights.py b/tmd/areas/create_area_weights.py index 1eb53590..b774f9f1 100644 --- a/tmd/areas/create_area_weights.py +++ b/tmd/areas/create_area_weights.py @@ -82,7 +82,11 @@ def _load_taxcalc_data(): - """Load TMD data with cached AGI and selected Tax-Calculator outputs.""" + """Load TMD data with cached AGI and selected Tax-Calculator outputs. + + After loading, drops columns not used by the solver to reduce + per-worker memory (~150 MB savings with 109 → ~25 columns). + """ vdf = pd.read_csv(INFILE_PATH) vdf["c00100"] = np.load(TAXCALC_AGI_CACHE) if CACHED_ALLVARS_PATH.exists(): @@ -94,6 +98,19 @@ def _load_taxcalc_data(): if "p22250" in vdf.columns and "p23250" in vdf.columns: vdf["capgains_net"] = vdf["p22250"] + vdf["p23250"] assert np.all(vdf.s006 > 0), "Not all weights are positive" + # Drop columns not used by the solver to reduce memory. + # Keep infrastructure columns + any column that could be a target + # variable (e*, c*, p* prefixes, plus synthetic variables). + # This adapts automatically as target specs change. + _INFRA_COLS = {"s006", "MARS", "data_source", "XTOT", "RECID"} + _SYNTH_COLS = {"capgains_net", "eitc", "ctc_total"} + keep = set() + for col in vdf.columns: + if col in _INFRA_COLS or col in _SYNTH_COLS: + keep.add(col) + elif col[:1] in ("e", "c", "p") and col[1:2].isdigit(): + keep.add(col) + vdf = vdf[sorted(keep)] return vdf From 9bb9b129ec940f8f40bac44819bc61959bc854ba Mon Sep 17 00:00:00 2001 From: Don Boyd Date: Wed, 25 Mar 2026 06:11:15 -0400 Subject: [PATCH 06/10] Add on-demand fingerprint test for weight reproducibility Simple hash-based test: for each area, rounds weights to integers and sums them. Hash of per-area sums catches any change in results. Run manually (not part of make test): pytest tests/test_fingerprint.py -v --update-fingerprint # save reference pytest tests/test_fingerprint.py -v # compare Co-Authored-By: Claude Opus 4.6 (1M context) --- Makefile | 2 +- tests/conftest.py | 9 ++ tests/test_fingerprint.py | 202 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 tests/test_fingerprint.py diff --git a/Makefile b/Makefile index 8fe3f73e..381cb3e5 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,7 @@ tmd_files: tmd/storage/output/tmd.csv.gz \ .PHONY=test test: tmd_files - pytest . -v -n4 --ignore=tests/national_targets_pipeline + pytest . -v -n4 --ignore=tests/national_targets_pipeline --ignore=tests/test_fingerprint.py .PHONY=data data: install tmd_files test diff --git a/tests/conftest.py b/tests/conftest.py index c86cffce..6f54518b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,15 @@ np.seterr(all="raise") +def pytest_addoption(parser): + parser.addoption( + "--update-fingerprint", + action="store_true", + default=False, + help="Save current results as reference fingerprint", + ) + + def create_tmd_records( data_path, weights_path, growfactors_path, exact_calculations=True ): diff --git a/tests/test_fingerprint.py b/tests/test_fingerprint.py new file mode 100644 index 00000000..9eae4ae6 --- /dev/null +++ b/tests/test_fingerprint.py @@ -0,0 +1,202 @@ +""" +On-demand fingerprint test for area weight results. + +NOT run by `make test` (excluded in Makefile). +Run manually after a full solve: + + pytest tests/test_fingerprint.py -v + pytest tests/test_fingerprint.py -v --update-fingerprint + +The first run saves a reference fingerprint. +Subsequent runs compare against it. + +Fingerprint method: for each area, round weights to nearest integer, +sum them, and hash the per-area sums. This is simple, fast, and +catches any meaningful change in results. +""" + +import hashlib +import json + +import numpy as np +import pandas as pd +import pytest + +from tmd.areas import AREAS_FOLDER + +FINGERPRINT_DIR = AREAS_FOLDER / "fingerprints" +STATE_WEIGHT_DIR = AREAS_FOLDER / "weights" / "states" + +ALL_STATES = [ + "AL", + "AK", + "AZ", + "AR", + "CA", + "CO", + "CT", + "DC", + "DE", + "FL", + "GA", + "HI", + "ID", + "IL", + "IN", + "IA", + "KS", + "KY", + "LA", + "ME", + "MD", + "MA", + "MI", + "MN", + "MS", + "MO", + "MT", + "NE", + "NV", + "NH", + "NJ", + "NM", + "NY", + "NC", + "ND", + "OH", + "OK", + "OR", + "PA", + "RI", + "SC", + "SD", + "TN", + "TX", + "UT", + "VT", + "VA", + "WA", + "WV", + "WI", + "WY", +] + + +def _compute_fingerprint(areas, weight_dir): + """Compute fingerprint from weight files. + + For each area, reads the first WT column, rounds weights to + nearest integer, and records the sum. The collection of integer + sums is hashed for a single comparison value. + """ + per_area = {} + for area in areas: + code = area.lower() + wpath = weight_dir / f"{code}_tmd_weights.csv.gz" + if not wpath.exists(): + continue + wdf = pd.read_csv(wpath) + wt_cols = [c for c in wdf.columns if c.startswith("WT")] + wt = wdf[wt_cols[0]].values + int_sum = int(np.round(wt).sum()) + per_area[area] = int_sum + + # Hash of all per-area integer sums + hash_str = "|".join(f"{a}:{per_area[a]}" for a in sorted(per_area.keys())) + hash_val = hashlib.sha256(hash_str.encode()).hexdigest()[:16] + + return { + "n_areas": len(per_area), + "weight_hash": hash_val, + "per_area_int_sums": per_area, + } + + +def _fingerprint_path(scope): + return FINGERPRINT_DIR / f"{scope}_fingerprint.json" + + +def _save_fingerprint(scope, fp): + FINGERPRINT_DIR.mkdir(parents=True, exist_ok=True) + path = _fingerprint_path(scope) + with open(path, "w", encoding="utf-8") as f: + json.dump(fp, f, indent=2, sort_keys=True) + return path + + +def _load_fingerprint(scope): + path = _fingerprint_path(scope) + if not path.exists(): + return None + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + +@pytest.fixture +def update_mode(request): + return request.config.getoption("--update-fingerprint") + + +def _has_weight_files(weight_dir, areas): + for a in areas: + wpath = weight_dir / f"{a.lower()}_tmd_weights.csv.gz" + if wpath.exists(): + return True + return False + + +@pytest.mark.skipif( + not _has_weight_files(STATE_WEIGHT_DIR, ALL_STATES), + reason="No state weight files — run solve_weights first", +) +class TestStateFingerprint: + """Fingerprint tests for state weights.""" + + # pylint: disable=redefined-outer-name + def test_state_weights_match_reference(self, update_mode): + """Compare weight integer sums against saved reference.""" + current = _compute_fingerprint(ALL_STATES, STATE_WEIGHT_DIR) + + if update_mode: + path = _save_fingerprint("states", current) + pytest.skip(f"Saved to {path} — re-run to test") + + reference = _load_fingerprint("states") + if reference is None: + path = _save_fingerprint("states", current) + pytest.skip(f"No reference found. Saved to {path} — re-run") + + ref_n = reference["n_areas"] + cur_n = current["n_areas"] + assert cur_n == ref_n, f"Area count: {ref_n} -> {cur_n}" + + assert ( + current["weight_hash"] == reference["weight_hash"] + ), "Weight hash mismatch — results changed" + + def test_per_area_sums_match(self, update_mode): + """Identify which areas changed.""" + if update_mode: + pytest.skip("Update mode") + + reference = _load_fingerprint("states") + if reference is None: + pytest.skip("No reference fingerprint") + + current = _compute_fingerprint(ALL_STATES, STATE_WEIGHT_DIR) + ref_sums = reference.get("per_area_int_sums", {}) + cur_sums = current.get("per_area_int_sums", {}) + + mismatches = [] + for area in sorted(ref_sums.keys()): + if area not in cur_sums: + mismatches.append(f"{area}: missing") + continue + if ref_sums[area] != cur_sums[area]: + mismatches.append( + f"{area}: {ref_sums[area]}" f" -> {cur_sums[area]}" + ) + + assert ( + not mismatches + ), f"{len(mismatches)} areas changed:\n" + "\n".join(mismatches) From df6e4aac3bf20c1e127e275efa24bfabbc0286b2 Mon Sep 17 00:00:00 2001 From: Don Boyd Date: Wed, 25 Mar 2026 09:17:32 -0400 Subject: [PATCH 07/10] Add state weight reference fingerprint for reproducibility testing Commit the states_fingerprint.json so reviewers can verify their solve results match: pytest tests/test_fingerprint.py -v Co-Authored-By: Claude Opus 4.6 (1M context) --- .../fingerprints/states_fingerprint.json | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 tmd/areas/fingerprints/states_fingerprint.json diff --git a/tmd/areas/fingerprints/states_fingerprint.json b/tmd/areas/fingerprints/states_fingerprint.json new file mode 100644 index 00000000..358bc2f9 --- /dev/null +++ b/tmd/areas/fingerprints/states_fingerprint.json @@ -0,0 +1,57 @@ +{ + "n_areas": 51, + "per_area_int_sums": { + "AK": 412466, + "AL": 2749392, + "AR": 1622704, + "AZ": 4130134, + "CA": 22167973, + "CO": 3442035, + "CT": 2131586, + "DC": 433116, + "DE": 585680, + "FL": 13026186, + "GA": 6008652, + "HI": 832970, + "IA": 1769383, + "ID": 1031419, + "IL": 7256718, + "IN": 3843732, + "KS": 1620038, + "KY": 2468402, + "LA": 2496679, + "MA": 4216198, + "MD": 3605225, + "ME": 833930, + "MI": 5795022, + "MN": 3279543, + "MO": 3447807, + "MS": 1568159, + "MT": 647039, + "NC": 5920637, + "ND": 435558, + "NE": 1084195, + "NH": 830201, + "NJ": 5351788, + "NM": 1217003, + "NV": 1843143, + "NY": 11692200, + "OH": 6776517, + "OK": 2146996, + "OR": 2459157, + "PA": 7525412, + "RI": 666732, + "SC": 2959393, + "SD": 506402, + "TN": 3891676, + "TX": 16241382, + "UT": 1769984, + "VA": 4920809, + "VT": 393735, + "WA": 4447067, + "WI": 3419741, + "WV": 989190, + "WY": 320883 + }, + "weight_hash": "8b36ae1c2ee0c384" +} \ No newline at end of file From 139eb8dccdd68c28dff4e1e83079a5644f2fbec1 Mon Sep 17 00:00:00 2001 From: Don Boyd Date: Wed, 25 Mar 2026 11:33:57 -0400 Subject: [PATCH 08/10] Update PR roadmap: PR 2 now routes both states and CDs through spec pipeline Co-Authored-By: Claude Opus 4.6 (1M context) --- PR_MESSAGE.md | 118 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 PR_MESSAGE.md diff --git a/PR_MESSAGE.md b/PR_MESSAGE.md new file mode 100644 index 00000000..82f0a54c --- /dev/null +++ b/PR_MESSAGE.md @@ -0,0 +1,118 @@ +# PR: Improve area weight solver: robustness, memory, and testing + +> **This is PR 1 of 4** adding congressional district (CD) weighting to TMD. The +> PRs are stacked and should be reviewed/merged in order. None of the PRs changes +> national code or results. There are changes to Makefile to exclude certain new +> tests that relate to areas and may be best run on an as-needed basis. +> +> 1. **Solver robustness** (this PR) — This improves the area weight solution +> infrastructure so that it is more flexible and efficient. This provides +> modest benefits for state target creation and weight solution, and will +> provide substantial benefits when we prepare targets and solve weights for +> 436 Congressional Districts (including DC). The changes include memory +> reduction, feasibility checks, per-constraint penalties, and new tests. +> 2. **Spec-based target pipeline** — CSV-driven target specification, SOI CD +> data ingestion, geographic shares. Both states and CDs route through the +> new spec pipeline. State targets are regenerated via the new pipeline +> (verified identical results via fingerprint test). +> 3. **Quality report enhancements** — CD-aware reporting, violation summaries, +> human-readable labels. +> 4. **Congressional district pipeline** — CD solver integration, developer +> mode, override YAML, 436-CD batch solving. + +## Summary of this PR + +Improve the area weight QP solver with robustness enhancements, major memory +reductions, and new test coverage. These changes benefit states and, in the +future, congressional districts. + +**Solver robustness (3 improvements):** + +- **Range-based target filtering:** `_drop_impossible_targets()` now checks + whether each target is achievable within multiplier bounds, not just whether + the constraint matrix row is all zeros. Catches geometrically unreachable + targets that the old check missed. +- **LP feasibility pre-check:** New `_check_feasibility()` runs a fast linear + program (scipy HiGHS) before the QP to identify which constraints will need + slack. Runs on every area solve (not just development). Diagnostic only — + logs which constraints are tight but does not change solutions. +- **Per-constraint slack penalties:** New `_assign_slack_penalties()` gives + reduced penalty (1e3 vs 1e6) to inherently noisy targets: e02400/e00300/e26270 + amounts in low-AGI bins, and filing-status counts in the lowest bins. The + solver relaxes these targets in preference to distorting weights globally to + meet targets. + +**Memory reductions (3 changes, net -36% vs master):** + +The PR reduces memory usage, especially per-worker usage, to make it practical +to use more workers on multi-processor systems: + +- Build constraint matrix B directly in sparse COO format, eliminating two dense + intermediates (~620 MB saved per worker). +- Use sparse matrices in LP feasibility check (~1.2 GB saved per worker). +- Trim unused TMD DataFrame columns (109 to ~30) and preload TMD in the parent + process before forking workers (shared via copy-on-write). + +Peak memory per worker: **1,244 MB (master) reduced to 798 MB (this PR)**. With 16 +workers: ~20 GB reduced to ~13 GB. + +**New infrastructure:** + +- `solver_overrides.py`: YAML-based per-area solver parameter management. + Provides infrastructure for customizing solver settings (tolerance, multiplier + bounds, etc.) per area. No override files are included in this PR; actual + per-area overrides will be generated and committed in a later PR when the + congressional district pipeline is added. + +**New tests:** + +- `test_state_weight_results.py`: post-solve validation of state weight files + (existence, nonnegativity, no NaN, correct columns, target accuracy within + tolerance). Run as part of the test suite if weight files exist. +- `test_fingerprint.py`: on-demand reproducibility test. Rounds weights to + integers, sums per area, and hashes. Detects any change in results across runs + or machines. Not part of `make test`; run manually with `pytest + tests/test_fingerprint.py -v`. + +## Impact on state weights + +State weight results will change numerically due to per-constraint slack +penalties. This is expected and is an improvement — noisy low-AGI targets that +previously forced weight distortion across all records are now relaxed +preferentially. The constraint tolerance (0.5%) and multiplier bounds (0-25x) +are unchanged. + +## Files changed (9 files, +998 / -61) + +| File | Change | +| ------------------------------------ | ---------------------------------------------------------------------------------------------- | +| `tmd/areas/create_area_weights.py` | Sparse matrix construction, enhanced feasibility check, LP pre-check, per-constraint penalties | +| `tmd/areas/batch_weights.py` | Parent-process TMD preloading, override support, per-constraint penalties wiring | +| `tmd/areas/solver_overrides.py` | NEW — YAML-based per-area override management | +| `tmd/areas/solve_weights.py` | Minor CLI scope fix | +| `tmd/areas/quality_report.py` | Fix --scope states CLI parsing | +| `tests/test_state_weight_results.py` | NEW — post-solve state weight validation | +| `tests/test_fingerprint.py` | NEW — on-demand reproducibility test | +| `tests/conftest.py` | Add --update-fingerprint CLI option | +| `Makefile` | Exclude fingerprint test from `make test` | + +## Test plan + +```bash +make format # no changes +make lint # passes clean +make clean && make data # build TMD + run all tests +python -m tmd.areas.prepare_targets --scope states # generate state target files +python -m pytest tests/test_prepare_targets.py -v # verify targets +python -m tmd.areas.solve_weights --scope states --workers 16 # solve state weights +python -m pytest tests/test_state_weight_results.py -v # verify weights +python -m tmd.areas.quality_report --scope states # quality report +pytest tests/test_fingerprint.py -v --update-fingerprint # save fingerprint +``` + +## Reproducibility + +Verified: 8-worker and 16-worker solves produce identical fingerprints (hash +`8b36ae1c2ee0c384`, integer weight sums match per area exactly). + +Prepared by @donboyd5 and [Claude Code](https://claude.com/claude-code) From eacea231f82a5217d6b2016b60bf34af26b22f84 Mon Sep 17 00:00:00 2001 From: Don Boyd Date: Wed, 25 Mar 2026 14:08:43 -0400 Subject: [PATCH 09/10] =?UTF-8?q?Remove=20PR=5FMESSAGE.md=20from=20repo=20?= =?UTF-8?q?=E2=80=94=20content=20is=20in=20the=20PR=20description?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) --- PR_MESSAGE.md | 118 -------------------------------------------------- 1 file changed, 118 deletions(-) delete mode 100644 PR_MESSAGE.md diff --git a/PR_MESSAGE.md b/PR_MESSAGE.md deleted file mode 100644 index 82f0a54c..00000000 --- a/PR_MESSAGE.md +++ /dev/null @@ -1,118 +0,0 @@ -# PR: Improve area weight solver: robustness, memory, and testing - -> **This is PR 1 of 4** adding congressional district (CD) weighting to TMD. The -> PRs are stacked and should be reviewed/merged in order. None of the PRs changes -> national code or results. There are changes to Makefile to exclude certain new -> tests that relate to areas and may be best run on an as-needed basis. -> -> 1. **Solver robustness** (this PR) — This improves the area weight solution -> infrastructure so that it is more flexible and efficient. This provides -> modest benefits for state target creation and weight solution, and will -> provide substantial benefits when we prepare targets and solve weights for -> 436 Congressional Districts (including DC). The changes include memory -> reduction, feasibility checks, per-constraint penalties, and new tests. -> 2. **Spec-based target pipeline** — CSV-driven target specification, SOI CD -> data ingestion, geographic shares. Both states and CDs route through the -> new spec pipeline. State targets are regenerated via the new pipeline -> (verified identical results via fingerprint test). -> 3. **Quality report enhancements** — CD-aware reporting, violation summaries, -> human-readable labels. -> 4. **Congressional district pipeline** — CD solver integration, developer -> mode, override YAML, 436-CD batch solving. - -## Summary of this PR - -Improve the area weight QP solver with robustness enhancements, major memory -reductions, and new test coverage. These changes benefit states and, in the -future, congressional districts. - -**Solver robustness (3 improvements):** - -- **Range-based target filtering:** `_drop_impossible_targets()` now checks - whether each target is achievable within multiplier bounds, not just whether - the constraint matrix row is all zeros. Catches geometrically unreachable - targets that the old check missed. -- **LP feasibility pre-check:** New `_check_feasibility()` runs a fast linear - program (scipy HiGHS) before the QP to identify which constraints will need - slack. Runs on every area solve (not just development). Diagnostic only — - logs which constraints are tight but does not change solutions. -- **Per-constraint slack penalties:** New `_assign_slack_penalties()` gives - reduced penalty (1e3 vs 1e6) to inherently noisy targets: e02400/e00300/e26270 - amounts in low-AGI bins, and filing-status counts in the lowest bins. The - solver relaxes these targets in preference to distorting weights globally to - meet targets. - -**Memory reductions (3 changes, net -36% vs master):** - -The PR reduces memory usage, especially per-worker usage, to make it practical -to use more workers on multi-processor systems: - -- Build constraint matrix B directly in sparse COO format, eliminating two dense - intermediates (~620 MB saved per worker). -- Use sparse matrices in LP feasibility check (~1.2 GB saved per worker). -- Trim unused TMD DataFrame columns (109 to ~30) and preload TMD in the parent - process before forking workers (shared via copy-on-write). - -Peak memory per worker: **1,244 MB (master) reduced to 798 MB (this PR)**. With 16 -workers: ~20 GB reduced to ~13 GB. - -**New infrastructure:** - -- `solver_overrides.py`: YAML-based per-area solver parameter management. - Provides infrastructure for customizing solver settings (tolerance, multiplier - bounds, etc.) per area. No override files are included in this PR; actual - per-area overrides will be generated and committed in a later PR when the - congressional district pipeline is added. - -**New tests:** - -- `test_state_weight_results.py`: post-solve validation of state weight files - (existence, nonnegativity, no NaN, correct columns, target accuracy within - tolerance). Run as part of the test suite if weight files exist. -- `test_fingerprint.py`: on-demand reproducibility test. Rounds weights to - integers, sums per area, and hashes. Detects any change in results across runs - or machines. Not part of `make test`; run manually with `pytest - tests/test_fingerprint.py -v`. - -## Impact on state weights - -State weight results will change numerically due to per-constraint slack -penalties. This is expected and is an improvement — noisy low-AGI targets that -previously forced weight distortion across all records are now relaxed -preferentially. The constraint tolerance (0.5%) and multiplier bounds (0-25x) -are unchanged. - -## Files changed (9 files, +998 / -61) - -| File | Change | -| ------------------------------------ | ---------------------------------------------------------------------------------------------- | -| `tmd/areas/create_area_weights.py` | Sparse matrix construction, enhanced feasibility check, LP pre-check, per-constraint penalties | -| `tmd/areas/batch_weights.py` | Parent-process TMD preloading, override support, per-constraint penalties wiring | -| `tmd/areas/solver_overrides.py` | NEW — YAML-based per-area override management | -| `tmd/areas/solve_weights.py` | Minor CLI scope fix | -| `tmd/areas/quality_report.py` | Fix --scope states CLI parsing | -| `tests/test_state_weight_results.py` | NEW — post-solve state weight validation | -| `tests/test_fingerprint.py` | NEW — on-demand reproducibility test | -| `tests/conftest.py` | Add --update-fingerprint CLI option | -| `Makefile` | Exclude fingerprint test from `make test` | - -## Test plan - -```bash -make format # no changes -make lint # passes clean -make clean && make data # build TMD + run all tests -python -m tmd.areas.prepare_targets --scope states # generate state target files -python -m pytest tests/test_prepare_targets.py -v # verify targets -python -m tmd.areas.solve_weights --scope states --workers 16 # solve state weights -python -m pytest tests/test_state_weight_results.py -v # verify weights -python -m tmd.areas.quality_report --scope states # quality report -pytest tests/test_fingerprint.py -v --update-fingerprint # save fingerprint -``` - -## Reproducibility - -Verified: 8-worker and 16-worker solves produce identical fingerprints (hash -`8b36ae1c2ee0c384`, integer weight sums match per area exactly). - -Prepared by @donboyd5 and [Claude Code](https://claude.com/claude-code) From 13025ecc893be1f9ad66c529612c58a0c0e903ea Mon Sep 17 00:00:00 2001 From: Don Boyd Date: Wed, 25 Mar 2026 15:00:15 -0400 Subject: [PATCH 10/10] Apply per-constraint slack penalties only for CDs, not states MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit States are well-conditioned — the solver hits all targets with uniform penalties. Per-constraint penalties were designed for CDs where extreme areas need relaxation. For states, they cause more targets to land on the wrong side of the 0.50% tolerance boundary without changing the actual weights (RMSE, objective, multipliers all identical to 6+ digits). Co-Authored-By: Claude Opus 4.6 (1M context) --- tmd/areas/batch_weights.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/tmd/areas/batch_weights.py b/tmd/areas/batch_weights.py index 09f7aff0..a5766b68 100644 --- a/tmd/areas/batch_weights.py +++ b/tmd/areas/batch_weights.py @@ -204,16 +204,23 @@ def _solve_one_area(area): out=out, ) - # Assign per-constraint slack penalties - per_constraint_penalties = _assign_slack_penalties( - labels, default_penalty=slack_penalty - ) - n_reduced = int((per_constraint_penalties < slack_penalty).sum()) - if n_reduced > 0: - out.write( - f"SLACK PENALTIES: {n_reduced}/{len(labels)}" - f" constraints have reduced penalty\n" + # Assign per-constraint slack penalties for CDs only. + # States are well-conditioned and hit all targets with uniform + # penalties. CDs have extreme areas (e.g., Manhattan) where + # reduced penalties on noisy low-AGI targets prevent the solver + # from distorting weights globally to satisfy them. + if multiplier_max > AREA_MULTIPLIER_MAX: + per_constraint_penalties = _assign_slack_penalties( + labels, default_penalty=slack_penalty ) + n_reduced = int((per_constraint_penalties < slack_penalty).sum()) + if n_reduced > 0: + out.write( + f"SLACK PENALTIES: {n_reduced}/{len(labels)}" + f" constraints have reduced penalty\n" + ) + else: + per_constraint_penalties = None # Check for per-record multiplier caps (from exhaustion limiting) caps_path = wgt_dir / f"{area}_record_caps.npy"