diff --git a/Makefile b/Makefile index 8fe3f73e..381cb3e5 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,7 @@ tmd_files: tmd/storage/output/tmd.csv.gz \ .PHONY=test test: tmd_files - pytest . -v -n4 --ignore=tests/national_targets_pipeline + pytest . -v -n4 --ignore=tests/national_targets_pipeline --ignore=tests/test_fingerprint.py .PHONY=data data: install tmd_files test diff --git a/tests/conftest.py b/tests/conftest.py index c86cffce..6f54518b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,15 @@ np.seterr(all="raise") +def pytest_addoption(parser): + parser.addoption( + "--update-fingerprint", + action="store_true", + default=False, + help="Save current results as reference fingerprint", + ) + + def create_tmd_records( data_path, weights_path, growfactors_path, exact_calculations=True ): diff --git a/tests/test_fingerprint.py b/tests/test_fingerprint.py new file mode 100644 index 00000000..9eae4ae6 --- /dev/null +++ b/tests/test_fingerprint.py @@ -0,0 +1,202 @@ +""" +On-demand fingerprint test for area weight results. + +NOT run by `make test` (excluded in Makefile). +Run manually after a full solve: + + pytest tests/test_fingerprint.py -v + pytest tests/test_fingerprint.py -v --update-fingerprint + +The first run saves a reference fingerprint. +Subsequent runs compare against it. + +Fingerprint method: for each area, round weights to nearest integer, +sum them, and hash the per-area sums. This is simple, fast, and +catches any meaningful change in results. +""" + +import hashlib +import json + +import numpy as np +import pandas as pd +import pytest + +from tmd.areas import AREAS_FOLDER + +FINGERPRINT_DIR = AREAS_FOLDER / "fingerprints" +STATE_WEIGHT_DIR = AREAS_FOLDER / "weights" / "states" + +ALL_STATES = [ + "AL", + "AK", + "AZ", + "AR", + "CA", + "CO", + "CT", + "DC", + "DE", + "FL", + "GA", + "HI", + "ID", + "IL", + "IN", + "IA", + "KS", + "KY", + "LA", + "ME", + "MD", + "MA", + "MI", + "MN", + "MS", + "MO", + "MT", + "NE", + "NV", + "NH", + "NJ", + "NM", + "NY", + "NC", + "ND", + "OH", + "OK", + "OR", + "PA", + "RI", + "SC", + "SD", + "TN", + "TX", + "UT", + "VT", + "VA", + "WA", + "WV", + "WI", + "WY", +] + + +def _compute_fingerprint(areas, weight_dir): + """Compute fingerprint from weight files. + + For each area, reads the first WT column, rounds weights to + nearest integer, and records the sum. The collection of integer + sums is hashed for a single comparison value. + """ + per_area = {} + for area in areas: + code = area.lower() + wpath = weight_dir / f"{code}_tmd_weights.csv.gz" + if not wpath.exists(): + continue + wdf = pd.read_csv(wpath) + wt_cols = [c for c in wdf.columns if c.startswith("WT")] + wt = wdf[wt_cols[0]].values + int_sum = int(np.round(wt).sum()) + per_area[area] = int_sum + + # Hash of all per-area integer sums + hash_str = "|".join(f"{a}:{per_area[a]}" for a in sorted(per_area.keys())) + hash_val = hashlib.sha256(hash_str.encode()).hexdigest()[:16] + + return { + "n_areas": len(per_area), + "weight_hash": hash_val, + "per_area_int_sums": per_area, + } + + +def _fingerprint_path(scope): + return FINGERPRINT_DIR / f"{scope}_fingerprint.json" + + +def _save_fingerprint(scope, fp): + FINGERPRINT_DIR.mkdir(parents=True, exist_ok=True) + path = _fingerprint_path(scope) + with open(path, "w", encoding="utf-8") as f: + json.dump(fp, f, indent=2, sort_keys=True) + return path + + +def _load_fingerprint(scope): + path = _fingerprint_path(scope) + if not path.exists(): + return None + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + +@pytest.fixture +def update_mode(request): + return request.config.getoption("--update-fingerprint") + + +def _has_weight_files(weight_dir, areas): + for a in areas: + wpath = weight_dir / f"{a.lower()}_tmd_weights.csv.gz" + if wpath.exists(): + return True + return False + + +@pytest.mark.skipif( + not _has_weight_files(STATE_WEIGHT_DIR, ALL_STATES), + reason="No state weight files — run solve_weights first", +) +class TestStateFingerprint: + """Fingerprint tests for state weights.""" + + # pylint: disable=redefined-outer-name + def test_state_weights_match_reference(self, update_mode): + """Compare weight integer sums against saved reference.""" + current = _compute_fingerprint(ALL_STATES, STATE_WEIGHT_DIR) + + if update_mode: + path = _save_fingerprint("states", current) + pytest.skip(f"Saved to {path} — re-run to test") + + reference = _load_fingerprint("states") + if reference is None: + path = _save_fingerprint("states", current) + pytest.skip(f"No reference found. Saved to {path} — re-run") + + ref_n = reference["n_areas"] + cur_n = current["n_areas"] + assert cur_n == ref_n, f"Area count: {ref_n} -> {cur_n}" + + assert ( + current["weight_hash"] == reference["weight_hash"] + ), "Weight hash mismatch — results changed" + + def test_per_area_sums_match(self, update_mode): + """Identify which areas changed.""" + if update_mode: + pytest.skip("Update mode") + + reference = _load_fingerprint("states") + if reference is None: + pytest.skip("No reference fingerprint") + + current = _compute_fingerprint(ALL_STATES, STATE_WEIGHT_DIR) + ref_sums = reference.get("per_area_int_sums", {}) + cur_sums = current.get("per_area_int_sums", {}) + + mismatches = [] + for area in sorted(ref_sums.keys()): + if area not in cur_sums: + mismatches.append(f"{area}: missing") + continue + if ref_sums[area] != cur_sums[area]: + mismatches.append( + f"{area}: {ref_sums[area]}" f" -> {cur_sums[area]}" + ) + + assert ( + not mismatches + ), f"{len(mismatches)} areas changed:\n" + "\n".join(mismatches) diff --git a/tests/test_state_weight_results.py b/tests/test_state_weight_results.py new file mode 100644 index 00000000..1ade71c4 --- /dev/null +++ b/tests/test_state_weight_results.py @@ -0,0 +1,163 @@ +""" +Post-solve validation of state weight files. + +These tests verify that the actual state weight outputs are valid. +They are skipped if weight files have not been generated yet +(i.e., solve_weights --scope states has not been run). + +Run after: + python -m tmd.areas.prepare_targets --scope states + python -m tmd.areas.solve_weights --scope states --workers 8 +""" + +import io +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest + +from tmd.areas.create_area_weights import ( + AREA_CONSTRAINT_TOL, + STATE_TARGET_DIR, + STATE_WEIGHT_DIR, + _build_constraint_matrix, + _drop_impossible_targets, + _load_taxcalc_data, +) +from tmd.areas.prepare.constants import ALL_STATES +from tmd.imputation_assumptions import TAXYEAR + +# Skip entire module if weight files haven't been generated +_WEIGHT_FILES = list(STATE_WEIGHT_DIR.glob("*_tmd_weights.csv.gz")) +pytestmark = pytest.mark.skipif( + len(_WEIGHT_FILES) < 51, + reason="State weight files not generated yet", +) + +# Also need cached data files for target accuracy checks +_CACHED = Path(__file__).parent.parent / "tmd" / "storage" / "output" +_HAS_CACHED = (_CACHED / "tmd.csv.gz").exists() and ( + _CACHED / "cached_c00100.npy" +).exists() + + +class TestStateWeightFiles: + """Basic validity checks on all 51 state weight files.""" + + def test_all_states_have_weight_files(self): + """Every state has a weight file.""" + for st in ALL_STATES: + wpath = STATE_WEIGHT_DIR / f"{st.lower()}_tmd_weights.csv.gz" + assert wpath.exists(), f"Missing weight file for {st}" + + def test_all_states_have_log_files(self): + """Every state has a solver log.""" + for st in ALL_STATES: + logpath = STATE_WEIGHT_DIR / f"{st.lower()}.log" + assert logpath.exists(), f"Missing log file for {st}" + + def test_weight_columns(self): + """Weight files have expected year columns.""" + wpath = STATE_WEIGHT_DIR / "mn_tmd_weights.csv.gz" + wdf = pd.read_csv(wpath) + expected = [f"WT{yr}" for yr in range(TAXYEAR, 2035)] + assert list(wdf.columns) == expected + + def test_weight_row_count(self): + """Weight files have one row per TMD record.""" + wpath = STATE_WEIGHT_DIR / "mn_tmd_weights.csv.gz" + wdf = pd.read_csv(wpath) + # Should match TMD record count (215,494 for 2022) + assert len(wdf) > 200_000 + + @pytest.mark.parametrize( + "state", + [s.lower() for s in ALL_STATES], + ) + def test_weights_nonnegative(self, state): + """All weights are non-negative.""" + wpath = STATE_WEIGHT_DIR / f"{state}_tmd_weights.csv.gz" + wdf = pd.read_csv(wpath) + assert (wdf >= 0).all().all(), f"{state}: negative weights found" + + @pytest.mark.parametrize( + "state", + [s.lower() for s in ALL_STATES], + ) + def test_weights_no_nan(self, state): + """No NaN or inf values in weights.""" + wpath = STATE_WEIGHT_DIR / f"{state}_tmd_weights.csv.gz" + wdf = pd.read_csv(wpath) + assert not wdf.isna().any().any(), f"{state}: NaN values found" + assert np.isfinite(wdf.values).all(), f"{state}: inf values found" + + @pytest.mark.parametrize( + "state", + [s.lower() for s in ALL_STATES], + ) + def test_solver_status_solved(self, state): + """Solver log reports Solved status.""" + logpath = STATE_WEIGHT_DIR / f"{state}.log" + log_text = logpath.read_text() + assert ( + "Solver status: Solved" in log_text + ), f"{state}: solver did not report Solved" + + +@pytest.mark.skipif( + not _HAS_CACHED, + reason="Cached TMD data files not available", +) +class TestStateTargetAccuracy: + """Verify weighted sums hit targets within tolerance.""" + + @pytest.fixture(scope="class") + def vdf(self): + """Load TMD data once for all accuracy tests.""" + return _load_taxcalc_data() + + @pytest.mark.parametrize( + "state", + ["al", "ca", "mn", "ny", "tx"], + ) + def test_targets_hit(self, vdf, state): + """Weighted sums match targets within constraint tolerance.""" + out = io.StringIO() + B_csc, targets, labels, pop_share = _build_constraint_matrix( + state, + vdf, + out, + target_dir=STATE_TARGET_DIR, + ) + B_csc, targets, labels = _drop_impossible_targets( + B_csc, + targets, + labels, + out, + ) + + # Load weights and compute multipliers + wpath = STATE_WEIGHT_DIR / f"{state}_tmd_weights.csv.gz" + wdf = pd.read_csv(wpath) + area_weights = wdf[f"WT{TAXYEAR}"].values + w0 = pop_share * vdf["s006"].values + # Avoid division by zero for zero-weight records + safe_w0 = np.where(w0 > 0, w0, 1.0) + x = area_weights / safe_w0 + x = np.where(w0 > 0, x, 0.0) + + # Check target accuracy + achieved = np.asarray(B_csc @ x).ravel() + rel_errors = np.abs(achieved - targets) / np.maximum( + np.abs(targets), 1.0 + ) + # Allow small margin above solver tolerance for floating-point + # differences between solver internals and weight-file roundtrip + eps = 1e-4 + n_violated = int((rel_errors > AREA_CONSTRAINT_TOL + eps).sum()) + max_err = rel_errors.max() + assert n_violated == 0, ( + f"{state}: {n_violated} targets violated, " + f"max error = {max_err * 100:.3f}%" + ) diff --git a/tmd/areas/batch_weights.py b/tmd/areas/batch_weights.py index 9a5c1f45..a5766b68 100644 --- a/tmd/areas/batch_weights.py +++ b/tmd/areas/batch_weights.py @@ -22,22 +22,34 @@ import numpy as np import pandas as pd import yaml - -# Module-level cache for TMD data (one per worker process) +from scipy.sparse import csc_matrix + +# Module-level globals shared across workers via fork copy-on-write. +# +# _WORKER_VDF and _WORKER_POP are loaded ONCE in the parent process +# (by _preload_data) before ProcessPoolExecutor forks workers. +# On Linux, forked children inherit these pages read-only; the OS +# shares the physical memory until a child writes to it. This saves +# ~190 MB × (num_workers - 1) compared to loading per-worker. +# +# The remaining globals are per-worker configuration set by +# _init_worker (called as the executor's initializer). _WORKER_VDF = None _WORKER_POP = None _WORKER_TARGET_DIR = None _WORKER_WEIGHT_DIR = None +_WORKER_MULTIPLIER_MAX = None +_WORKER_OVERRIDES = None + +def _preload_data(): + """Load TMD data in the parent process before forking workers. -def _init_worker(target_dir=None, weight_dir=None): - """Load TMD data once per worker process.""" + On Linux (fork), child processes share the parent's memory + pages copy-on-write. Loading here instead of per-worker avoids + duplicating ~190 MB × N workers. + """ global _WORKER_VDF, _WORKER_POP - global _WORKER_TARGET_DIR, _WORKER_WEIGHT_DIR - if target_dir is not None: - _WORKER_TARGET_DIR = Path(target_dir) - if weight_dir is not None: - _WORKER_WEIGHT_DIR = Path(weight_dir) if _WORKER_VDF is not None: return from tmd.areas.create_area_weights import ( @@ -50,6 +62,34 @@ def _init_worker(target_dir=None, weight_dir=None): _WORKER_POP = yaml.safe_load(pf.read()) +def _init_worker( + target_dir=None, + weight_dir=None, + multiplier_max=None, + override_path=None, +): + """Initialize per-worker state (directories, overrides). + + TMD data is already loaded in the parent via _preload_data() + and shared copy-on-write through fork. + """ + global _WORKER_TARGET_DIR, _WORKER_WEIGHT_DIR + global _WORKER_MULTIPLIER_MAX, _WORKER_OVERRIDES + if target_dir is not None: + _WORKER_TARGET_DIR = Path(target_dir) + if weight_dir is not None: + _WORKER_WEIGHT_DIR = Path(weight_dir) + if multiplier_max is not None: + _WORKER_MULTIPLIER_MAX = multiplier_max + if override_path is not None: + from tmd.areas.solver_overrides import load_overrides + + _WORKER_OVERRIDES = load_overrides(Path(override_path)) + # Fallback: load data if not preloaded (e.g. single-area solve) + if _WORKER_VDF is None: + _preload_data() + + def _solve_one_area(area): """ Solve area weights for one area using cached worker data. @@ -65,7 +105,9 @@ def _solve_one_area(area): AREA_SLACK_PENALTY, FIRST_YEAR, LAST_YEAR, + _assign_slack_penalties, _build_constraint_matrix, + _check_feasibility, _drop_impossible_targets, _print_multiplier_diagnostics, _print_slack_diagnostics, @@ -82,6 +124,29 @@ def _solve_one_area(area): tgt_dir = _WORKER_TARGET_DIR wgt_dir = _WORKER_WEIGHT_DIR params = _read_params(area, out, target_dir=tgt_dir) + + # Apply solver overrides (centralized file takes precedence) + if _WORKER_OVERRIDES is not None: + from tmd.areas.solver_overrides import ( + get_area_overrides, + get_drop_targets, + ) + + area_ovr = get_area_overrides(_WORKER_OVERRIDES, area) + # Override params with centralized overrides + for key in ( + "constraint_tol", + "slack_penalty", + "max_iter", + "multiplier_min", + "multiplier_max", + ): + if key in area_ovr: + params[key] = area_ovr[key] + drop_patterns = get_drop_targets(_WORKER_OVERRIDES, area) + else: + drop_patterns = [] + constraint_tol = params.get( "constraint_tol", params.get("target_ratio_tolerance", AREA_CONSTRAINT_TOL), @@ -89,17 +154,74 @@ def _solve_one_area(area): slack_penalty = params.get("slack_penalty", AREA_SLACK_PENALTY) max_iter = params.get("max_iter", AREA_MAX_ITER) multiplier_min = params.get("multiplier_min", AREA_MULTIPLIER_MIN) - multiplier_max = params.get("multiplier_max", AREA_MULTIPLIER_MAX) + default_max = ( + _WORKER_MULTIPLIER_MAX + if _WORKER_MULTIPLIER_MAX is not None + else AREA_MULTIPLIER_MAX + ) + multiplier_max = params.get("multiplier_max", default_max) B_csc, targets, labels, pop_share = _build_constraint_matrix( area, vdf, out, target_dir=tgt_dir ) B_csc, targets, labels = _drop_impossible_targets( - B_csc, targets, labels, out + B_csc, + targets, + labels, + out, + multiplier_max=multiplier_max, + constraint_tol=constraint_tol, ) + # Apply per-area target drops from override file + if drop_patterns: + from tmd.areas.solver_overrides import should_drop_target + + keep = [not should_drop_target(lbl, drop_patterns) for lbl in labels] + n_drop = sum(1 for k in keep if not k) + if n_drop > 0: + keep_arr = np.array(keep) + B_dense = B_csc.toarray() + B_csc = csc_matrix(B_dense[keep_arr, :]) + targets = targets[keep_arr] + labels = [lbl2 for lbl2, k in zip(labels, keep) if k] + out.write( + f"OVERRIDE: dropped {n_drop} targets," + f" {len(targets)} remaining\n" + ) + n_records = B_csc.shape[1] + # Pre-solve feasibility check + _check_feasibility( + B_csc, + targets, + labels, + n_records, + constraint_tol=constraint_tol, + multiplier_min=multiplier_min, + multiplier_max=multiplier_max, + out=out, + ) + + # Assign per-constraint slack penalties for CDs only. + # States are well-conditioned and hit all targets with uniform + # penalties. CDs have extreme areas (e.g., Manhattan) where + # reduced penalties on noisy low-AGI targets prevent the solver + # from distorting weights globally to satisfy them. + if multiplier_max > AREA_MULTIPLIER_MAX: + per_constraint_penalties = _assign_slack_penalties( + labels, default_penalty=slack_penalty + ) + n_reduced = int((per_constraint_penalties < slack_penalty).sum()) + if n_reduced > 0: + out.write( + f"SLACK PENALTIES: {n_reduced}/{len(labels)}" + f" constraints have reduced penalty\n" + ) + else: + per_constraint_penalties = None + # Check for per-record multiplier caps (from exhaustion limiting) caps_path = wgt_dir / f"{area}_record_caps.npy" if caps_path.exists(): @@ -117,7 +239,7 @@ def _solve_one_area(area): labels, n_records, constraint_tol=constraint_tol, - slack_penalty=slack_penalty, + slack_penalties=per_constraint_penalties, max_iter=max_iter, multiplier_min=multiplier_min, multiplier_max=multiplier_max, @@ -207,6 +329,8 @@ def run_batch( force=False, target_dir=None, weight_dir=None, + multiplier_max=None, + override_path=None, ): """ Run area weight optimization for multiple areas in parallel. @@ -272,9 +396,11 @@ def run_batch( - 1 ) # subtract header n = len(areas) + max_total = n * n_targets print( f"Processing {n} areas" - f" (up to {n_targets} targets each)" + f" (up to {n_targets} targets each," + f" {max_total:,} max total)" f" with {num_workers} workers..." ) print( @@ -285,6 +411,10 @@ def run_batch( # Ensure weights directory exists weight_dir.mkdir(parents=True, exist_ok=True) + # Load TMD data in parent before forking — workers share via + # copy-on-write, saving ~190 MB × (num_workers - 1). + _preload_data() + t_start = time.time() completed = 0 violated_areas = [] @@ -294,7 +424,12 @@ def run_batch( with ProcessPoolExecutor( max_workers=num_workers, initializer=_init_worker, - initargs=(str(target_dir), str(weight_dir)), + initargs=( + str(target_dir), + str(weight_dir), + multiplier_max, + str(override_path) if override_path else None, + ), ) as executor: futures = { executor.submit(_solve_one_area, area): area for area in areas @@ -328,7 +463,11 @@ def run_batch( sys.stdout.write("\n") total = time.time() - t_start - print(f"\nCompleted {completed}/{n} areas in {total:.1f}s") + m, s = divmod(total, 60) + print( + f"\nCompleted {completed}/{n} areas in" + f" {total:.1f}s ({int(m)}m {s:.0f}s)" + ) if violated_areas: total_viol = sum(v for _, v in violated_areas) print( diff --git a/tmd/areas/create_area_weights.py b/tmd/areas/create_area_weights.py index f50cb35e..b774f9f1 100644 --- a/tmd/areas/create_area_weights.py +++ b/tmd/areas/create_area_weights.py @@ -28,12 +28,15 @@ import sys import time +import re + import clarabel import numpy as np import pandas as pd import yaml +from scipy.optimize import linprog from scipy.sparse import ( - csc_matrix, + coo_matrix, diags as spdiags, eye as speye, hstack, @@ -65,17 +68,25 @@ # Default solver parameters AREA_CONSTRAINT_TOL = 0.005 AREA_SLACK_PENALTY = 1e6 +AREA_REDUCED_SLACK_PENALTY = 1e3 AREA_MAX_ITER = 2000 AREA_MULTIPLIER_MIN = 0.0 AREA_MULTIPLIER_MAX = 25.0 +CD_MULTIPLIER_MAX = 50.0 -# Default target/weight directories for states +# Default target/weight directories STATE_TARGET_DIR = AREAS_FOLDER / "targets" / "states" STATE_WEIGHT_DIR = AREAS_FOLDER / "weights" / "states" +CD_TARGET_DIR = AREAS_FOLDER / "targets" / "cds" +CD_WEIGHT_DIR = AREAS_FOLDER / "weights" / "cds" def _load_taxcalc_data(): - """Load TMD data with cached AGI and selected Tax-Calculator outputs.""" + """Load TMD data with cached AGI and selected Tax-Calculator outputs. + + After loading, drops columns not used by the solver to reduce + per-worker memory (~150 MB savings with 109 → ~25 columns). + """ vdf = pd.read_csv(INFILE_PATH) vdf["c00100"] = np.load(TAXCALC_AGI_CACHE) if CACHED_ALLVARS_PATH.exists(): @@ -87,6 +98,19 @@ def _load_taxcalc_data(): if "p22250" in vdf.columns and "p23250" in vdf.columns: vdf["capgains_net"] = vdf["p22250"] + vdf["p23250"] assert np.all(vdf.s006 > 0), "Not all weights are positive" + # Drop columns not used by the solver to reduce memory. + # Keep infrastructure columns + any column that could be a target + # variable (e*, c*, p* prefixes, plus synthetic variables). + # This adapts automatically as target specs change. + _INFRA_COLS = {"s006", "MARS", "data_source", "XTOT", "RECID"} + _SYNTH_COLS = {"capgains_net", "eitc", "ctc_total"} + keep = set() + for col in vdf.columns: + if col in _INFRA_COLS or col in _SYNTH_COLS: + keep.add(col) + elif col[:1] in ("e", "c", "p") and col[1:2].isdigit(): + keep.add(col) + vdf = vdf[sorted(keep)] return vdf @@ -145,10 +169,29 @@ def _build_constraint_matrix(area, vardf, out, target_dir=None): targets_file = target_dir / f"{area}_targets.csv" tdf = pd.read_csv(targets_file, comment="#") + # Pre-cache column arrays to avoid repeated DataFrame lookups + c00100_arr = vardf["c00100"].values.astype(float) + mars_arr = vardf["MARS"].values.astype(float) + data_source_arr = vardf["data_source"].values + scope1_mask = (data_source_arr == 1).astype(float) + scope2_mask = (data_source_arr == 0).astype(float) + ones_arr = np.ones(numobs, dtype=float) + + unique_vars = tdf.loc[tdf["count"] != 1, "varname"].unique() + var_cache = {} + for vname in unique_vars: + var_cache[vname] = vardf[vname].astype(float).values + targets_list = [] labels_list = [] - columns = [] pop_share = None + w0 = None + + # Build B matrix directly in sparse COO format, avoiding dense + # intermediates that would require ~620 MB for 215K × 179. + row_indices = [] + col_indices = [] + data_values = [] for row_idx, row in enumerate(tdf.itertuples(index=False)): line = f"{area}:target{row_idx + 1}" @@ -179,78 +222,316 @@ def _build_constraint_matrix(area, vardf, out, target_dir=None): f" / {national_population:.0f}" f" = {pop_share:.6f}\n" ) + w0 = pop_share * vardf.s006.values - # construct variable array for this target + # construct variable array from cache assert 0 <= row.count <= 4, f"count {row.count} not in [0,4] on {line}" if row.count == 0: - var_array = vardf[row.varname].astype(float).values + var_array = var_cache[row.varname] elif row.count == 1: - var_array = np.ones(numobs, dtype=float) + var_array = ones_arr elif row.count == 2: - var_array = (vardf[row.varname] != 0).astype(float).values + var_array = (var_cache[row.varname] != 0).astype(float) elif row.count == 3: - var_array = (vardf[row.varname] > 0).astype(float).values + var_array = (var_cache[row.varname] > 0).astype(float) else: - var_array = (vardf[row.varname] < 0).astype(float).values + var_array = (var_cache[row.varname] < 0).astype(float) - # construct mask - mask = np.ones(numobs, dtype=float) + # construct mask using cached arrays assert 0 <= row.scope <= 2, f"scope {row.scope} not in [0,2] on {line}" if row.scope == 1: - mask *= (vardf.data_source == 1).values.astype(float) + mask = scope1_mask.copy() elif row.scope == 2: - mask *= (vardf.data_source == 0).values.astype(float) + mask = scope2_mask.copy() + else: + mask = ones_arr.copy() - in_agi_bin = (vardf.c00100 >= row.agilo) & (vardf.c00100 < row.agihi) - mask *= in_agi_bin.values.astype(float) + mask *= (c00100_arr >= row.agilo) & (c00100_arr < row.agihi) assert ( 0 <= row.fstatus <= 5 ), f"fstatus {row.fstatus} not in [0,5] on {line}" if row.fstatus > 0: - mask *= (vardf.MARS == row.fstatus).values.astype(float) + mask *= mars_arr == row.fstatus - # A[j,i] = mask * var_array (data values for this constraint) - columns.append(mask * var_array) + # B[j,i] = w0[i] * mask[i] * var_array[i] + # Store only nonzero entries (sparse COO format) + b_row = w0 * mask * var_array + nz = np.nonzero(b_row)[0] + if len(nz) > 0: + row_indices.append(np.full(len(nz), row_idx, dtype=np.int32)) + col_indices.append(nz.astype(np.int32)) + data_values.append(b_row[nz]) assert pop_share is not None, "XTOT target not found" - # A matrix: n_records x n_targets (each column is a constraint) - A_dense = np.column_stack(columns) - - # B = diag(w0) @ A where w0 = pop_share * s006 - w0 = pop_share * vardf.s006.values - B_dense = w0[:, np.newaxis] * A_dense - - # convert to sparse (n_targets x n_records) for Clarabel - B_csc = csc_matrix(B_dense.T) + # Assemble sparse B directly from COO — no dense intermediate + n_targets = len(targets_list) + B_csc = coo_matrix( + ( + np.concatenate(data_values), + (np.concatenate(row_indices), np.concatenate(col_indices)), + ), + shape=(n_targets, numobs), + ).tocsc() targets_arr = np.array(targets_list) return B_csc, targets_arr, labels_list, pop_share -def _drop_impossible_targets(B_csc, targets, labels, out): - """Drop targets where all constraint matrix values are zero.""" - col_sums = np.abs(np.asarray(B_csc.sum(axis=1))).ravel() - all_zero = col_sums == 0 - if all_zero.any(): - n_drop = int(all_zero.sum()) - for i in np.where(all_zero)[0]: +def _drop_impossible_targets( + B_csc, + targets, + labels, + out, + multiplier_min=AREA_MULTIPLIER_MIN, + multiplier_max=AREA_MULTIPLIER_MAX, + constraint_tol=AREA_CONSTRAINT_TOL, +): + """ + Drop targets that cannot be reached within tolerance. + + A target is impossible if: + - All B matrix values are zero (no records contribute), or + - The target is outside the achievable range even with all + multipliers at their extreme bounds. + """ + m = len(targets) + drop = np.zeros(m, dtype=bool) + + for j in range(m): + # Extract one sparse row at a time — avoids 310 MB dense copy + row = B_csc.getrow(j).toarray().ravel() + pos = row > 0 + neg = row < 0 + + # All zeros + if not pos.any() and not neg.any(): + out.write( + f"DROPPING impossible target" f" (all zeros): {labels[j]}\n" + ) + drop[j] = True + continue + + # Compute achievable range + max_val = ( + multiplier_max * row[pos].sum() + multiplier_min * row[neg].sum() + ) + min_val = ( + multiplier_min * row[pos].sum() + multiplier_max * row[neg].sum() + ) + + tgt = targets[j] + tol = abs(tgt) * constraint_tol + if max_val < tgt - tol or min_val > tgt + tol: out.write( - f"DROPPING impossible target" f" (all zeros): {labels[i]}\n" + f"DROPPING unreachable target: {labels[j]}" + f" (target={tgt:.0f}," + f" achievable=[{min_val:.0f}, {max_val:.0f}])\n" ) - keep = ~all_zero - B_dense = B_csc.toarray() - B_csc = csc_matrix(B_dense[keep, :]) + drop[j] = True + + n_drop = int(drop.sum()) + if n_drop > 0: + keep = ~drop + # Slice sparse matrix directly — no dense conversion + keep_idx = np.where(keep)[0] + B_csc = B_csc[keep_idx, :] targets = targets[keep] labels = [lab for lab, k in zip(labels, keep) if k] out.write( - f"Dropped {n_drop} impossible targets," + f"Dropped {n_drop} impossible/unreachable targets," f" {len(targets)} remaining\n" ) return B_csc, targets, labels +def _check_feasibility( + B_csc, + targets, + labels, + n_records, + constraint_tol=AREA_CONSTRAINT_TOL, + multiplier_min=AREA_MULTIPLIER_MIN, + multiplier_max=AREA_MULTIPLIER_MAX, + out=None, +): + """ + Fast LP feasibility check before running the full QP. + + Solves: minimize 0 subject to + cl <= Bx + s_lo - s_hi <= cu + multiplier_min <= x <= multiplier_max + s_lo, s_hi >= 0 + + If infeasible, identifies which constraints contribute most + to the infeasibility by solving a relaxed LP that minimizes + total slack. + + Returns + ------- + dict with keys: + feasible : bool + slack_needed : np.ndarray or None + Per-constraint slack needed (0 if feasible). + worst_labels : list of (label, slack_amount) + Top constraints requiring the most slack. + """ + if out is None: + out = sys.stdout + + m = len(targets) + abs_targets = np.abs(targets) + tol_band = abs_targets * constraint_tol + cl = targets - tol_band + cu = targets + tol_band + + # LP: minimize sum(s_lo + s_hi) + # Variables: [x (n_records), s_lo (m), s_hi (m)] + n_total = n_records + 2 * m + + # Objective: minimize sum of slacks + c_obj = np.zeros(n_total) + c_obj[n_records:] = 1.0 # minimize total slack + + # Constraints: cl <= Bx + s_lo - s_hi <= cu + # Rewrite as: + # Bx + s_lo - s_hi >= cl => -Bx - s_lo + s_hi <= -cl + # Bx + s_lo - s_hi <= cu + # Build entirely in sparse format to avoid ~1.2 GB dense alloc + I_m = speye(m, format="csc") + # [B | I | -I] x <= cu + A_upper = hstack([B_csc, I_m, -I_m], format="csc") + # [-B | -I | I] x <= -cl + A_lower = hstack([-B_csc, -I_m, I_m], format="csc") + A_ub = vstack([A_upper, A_lower], format="csc") + b_ub = np.concatenate([cu, -cl]) + + # Bounds + if isinstance(multiplier_max, np.ndarray): + x_ub = multiplier_max + else: + x_ub = np.full(n_records, multiplier_max) + bounds = np.column_stack( + [ + np.concatenate( + [ + np.full(n_records, multiplier_min), + np.zeros(2 * m), + ] + ), + np.concatenate( + [ + x_ub, + np.full(2 * m, np.inf), + ] + ), + ] + ) + + result = linprog( + c_obj, + A_ub=A_ub, + b_ub=b_ub, + bounds=bounds, + method="highs", + options={"presolve": True, "time_limit": 30}, + ) + + info = {"feasible": False, "slack_needed": None, "worst_labels": []} + + if result.success: + s_lo = result.x[n_records : n_records + m] + s_hi = result.x[n_records + m :] + total_slack = s_lo + s_hi + info["feasible"] = total_slack.max() < 1e-6 + info["slack_needed"] = total_slack + + if not info["feasible"]: + # Report constraints needing the most slack + worst_idx = np.argsort(total_slack)[::-1] + worst = [] + for idx in worst_idx[:10]: + if total_slack[idx] > 1e-6: + rel = total_slack[idx] / max(abs(targets[idx]), 1) + worst.append((labels[idx], total_slack[idx], rel)) + info["worst_labels"] = worst + + out.write( + f"PRE-SOLVE FEASIBILITY: INFEASIBLE" + f" — {len(worst)} constraints need slack\n" + ) + for lbl, slk, rel in worst[:5]: + out.write( + f" {lbl}: slack={slk:.0f}" f" ({rel:.1%} of target)\n" + ) + else: + out.write("PRE-SOLVE FEASIBILITY: OK\n") + else: + out.write( + f"PRE-SOLVE FEASIBILITY: LP solver failed" f" ({result.message})\n" + ) + + return info + + +def _assign_slack_penalties( + labels, + default_penalty=AREA_SLACK_PENALTY, + reduced_penalty=AREA_REDUCED_SLACK_PENALTY, +): + """ + Assign per-constraint slack penalties based on label content. + + Most constraints get the default (high) penalty. Constraints + involving variables and AGI bins where SOI-to-TMD mapping is + inherently noisy get a reduced penalty — the solver will still + try to hit them but will relax them before distorting weights. + + Reduced-penalty targets: + - e02400, e00300, e26270 amounts in AGI stubs 1-3 (<$25K). + These income types are concentrated among high-income or + elderly filers; low-AGI bin targets are noisy and can + conflict with other constraints in extreme CDs. + - Filing-status counts (fs=1,2,4) in stubs 1-2 (<$10K). + Very small cells in the lowest income bins. + + Returns + ------- + np.ndarray + Penalty value for each constraint, same length as labels. + """ + penalties = np.full(len(labels), default_penalty) + reduced_vars = {"e02400", "e00300", "e26270"} + + for i, label in enumerate(labels): + parts = label.split("/") + varname = parts[0] + attrs = {} + for p in parts[1:]: + if "=" in p: + k, v = p.split("=", 1) + attrs[k] = v + + cnt = int(attrs.get("cnt", -1)) + fs = int(attrs.get("fs", 0)) + agi_raw = attrs.get("agi", "") + + # Parse AGI upper bound from "[lo,hi)" + agihi = 9e99 + m = re.match(r"\[([^,]+),([^)]+)\)", agi_raw) + if m: + agihi = float(m.group(2)) + + # Reduced penalty for problematic variable-bin combos + if varname in reduced_vars and cnt == 0 and agihi <= 25000: + penalties[i] = reduced_penalty + elif cnt == 1 and fs > 0 and agihi <= 10000: + penalties[i] = reduced_penalty + + return penalties + + def _solve_area_qp( # pylint: disable=unused-argument B_csc, targets, @@ -258,6 +539,7 @@ def _solve_area_qp( # pylint: disable=unused-argument n_records, constraint_tol=AREA_CONSTRAINT_TOL, slack_penalty=AREA_SLACK_PENALTY, + slack_penalties=None, max_iter=AREA_MAX_ITER, multiplier_min=AREA_MULTIPLIER_MIN, multiplier_max=AREA_MULTIPLIER_MAX, @@ -269,6 +551,12 @@ def _solve_area_qp( # pylint: disable=unused-argument Parameters ---------- + slack_penalty : float + Default penalty for constraint slack (used when + slack_penalties is None). + slack_penalties : np.ndarray, optional + Per-constraint slack penalties (length m). Overrides + slack_penalty when provided. weight_penalty : float Penalty weight on (x-1)^2 relative to constraint slack. Higher values keep multipliers closer to 1.0 @@ -289,9 +577,14 @@ def _solve_area_qp( # pylint: disable=unused-argument cu = targets + tol_band # diagonal Hessian: 2*alpha for x, 2*M for slacks + # Each slack pair (s_lo, s_hi) gets the same penalty hess_diag = np.empty(n_total) hess_diag[:n_records] = 2.0 * weight_penalty - hess_diag[n_records:] = 2.0 * slack_penalty + if slack_penalties is not None: + hess_diag[n_records : n_records + m] = 2.0 * slack_penalties + hess_diag[n_records + m :] = 2.0 * slack_penalties + else: + hess_diag[n_records:] = 2.0 * slack_penalty P = spdiags(hess_diag, format="csc") # linear term: -2*alpha for x @@ -366,6 +659,7 @@ def _solve_area_qp( # pylint: disable=unused-argument "iterations": result.iterations, "solve_time": elapsed, "clarabel_solve_time": result.solve_time, + "dual": np.array(result.z) if result.z is not None else None, } return x_opt, s_lo, s_hi, info @@ -556,13 +850,16 @@ def create_area_weights_file( out.write(f" max |relative error|: {rel_err_x1.max():.6f}\n") # solve QP + per_constraint_penalties = _assign_slack_penalties( + labels, default_penalty=slack_penalty + ) x_opt, s_lo, s_hi, _info = _solve_area_qp( B_csc, targets, labels, n_records, constraint_tol=constraint_tol, - slack_penalty=slack_penalty, + slack_penalties=per_constraint_penalties, max_iter=max_iter, multiplier_min=multiplier_min, multiplier_max=multiplier_max, diff --git a/tmd/areas/fingerprints/states_fingerprint.json b/tmd/areas/fingerprints/states_fingerprint.json new file mode 100644 index 00000000..358bc2f9 --- /dev/null +++ b/tmd/areas/fingerprints/states_fingerprint.json @@ -0,0 +1,57 @@ +{ + "n_areas": 51, + "per_area_int_sums": { + "AK": 412466, + "AL": 2749392, + "AR": 1622704, + "AZ": 4130134, + "CA": 22167973, + "CO": 3442035, + "CT": 2131586, + "DC": 433116, + "DE": 585680, + "FL": 13026186, + "GA": 6008652, + "HI": 832970, + "IA": 1769383, + "ID": 1031419, + "IL": 7256718, + "IN": 3843732, + "KS": 1620038, + "KY": 2468402, + "LA": 2496679, + "MA": 4216198, + "MD": 3605225, + "ME": 833930, + "MI": 5795022, + "MN": 3279543, + "MO": 3447807, + "MS": 1568159, + "MT": 647039, + "NC": 5920637, + "ND": 435558, + "NE": 1084195, + "NH": 830201, + "NJ": 5351788, + "NM": 1217003, + "NV": 1843143, + "NY": 11692200, + "OH": 6776517, + "OK": 2146996, + "OR": 2459157, + "PA": 7525412, + "RI": 666732, + "SC": 2959393, + "SD": 506402, + "TN": 3891676, + "TX": 16241382, + "UT": 1769984, + "VA": 4920809, + "VT": 393735, + "WA": 4447067, + "WI": 3419741, + "WV": 989190, + "WY": 320883 + }, + "weight_hash": "8b36ae1c2ee0c384" +} \ No newline at end of file diff --git a/tmd/areas/quality_report.py b/tmd/areas/quality_report.py index 1531cb2e..b18837df 100644 --- a/tmd/areas/quality_report.py +++ b/tmd/areas/quality_report.py @@ -760,7 +760,7 @@ def main(): args = parser.parse_args() areas = None - if args.scope: + if args.scope and args.scope.lower() != "states": areas = [s.strip().upper() for s in args.scope.split(",")] report = generate_report(areas) diff --git a/tmd/areas/solve_weights.py b/tmd/areas/solve_weights.py index ce8af3b0..2f085368 100644 --- a/tmd/areas/solve_weights.py +++ b/tmd/areas/solve_weights.py @@ -71,8 +71,7 @@ def solve_state_weights( t0 = time.time() - # --- Pass 1: unconstrained solve --- - print("Pass 1: solving state weights...") + print("Solving state weights...") run_batch( num_workers=num_workers, area_filter=area_filter, diff --git a/tmd/areas/solver_overrides.py b/tmd/areas/solver_overrides.py new file mode 100644 index 00000000..502c5b4c --- /dev/null +++ b/tmd/areas/solver_overrides.py @@ -0,0 +1,135 @@ +""" +Per-area solver override management. + +Provides a centralized mechanism for storing and loading per-area +solver parameter overrides. The override file is a YAML file with +a ``_defaults`` section and per-area entries. + +Developer mode generates this file by running iterative LP/QP +feasibility checks and applying a relaxation cascade. Production +mode reads it to configure each area's solve in a single pass. + +File format:: + + # Solver overrides for CDs + _defaults: + multiplier_max: 50 + constraint_tol: 0.005 + + # Only areas needing special treatment appear below + fl28: + drop_targets: + - "e26270/cnt=0/agi=[10000.0,25000.0)" + ny12: + constraint_tol: 0.01 + drop_targets: + - "e00300/cnt=0/agi=[-9e+99,1.0)" +""" + +from pathlib import Path +from typing import Dict, List + +import yaml + + +def load_overrides(override_path: Path) -> dict: + """ + Load solver overrides from a YAML file. + + Returns a dict with ``_defaults`` and per-area entries. + If the file doesn't exist, returns empty defaults. + """ + if not override_path.exists(): + return {"_defaults": {}} + with open(override_path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f.read()) + if data is None: + data = {} + if "_defaults" not in data: + data["_defaults"] = {} + return data + + +def get_area_overrides( + overrides: dict, + area: str, +) -> dict: + """ + Get merged overrides for a specific area. + + Returns defaults updated with any area-specific overrides. + Per-area values take precedence over defaults. + """ + defaults = dict(overrides.get("_defaults", {})) + area_specific = overrides.get(area, {}) + if area_specific: + defaults.update(area_specific) + return defaults + + +def get_drop_targets(overrides: dict, area: str) -> List[str]: + """ + Get list of target labels to drop for an area. + + Returns a list of constraint label patterns to exclude + from the QP. Empty list if no drops needed. + """ + area_ovr = get_area_overrides(overrides, area) + return area_ovr.get("drop_targets", []) + + +def should_drop_target( + label: str, + drop_patterns: List[str], +) -> bool: + """ + Check if a target label matches any drop pattern. + + Patterns are matched as substrings of the full label. + For example, "e26270/cnt=0/agi=[10000.0,25000.0)" matches + any label containing that string. + """ + for pattern in drop_patterns: + if pattern in label: + return True + return False + + +def write_overrides( + override_path: Path, + defaults: dict, + area_overrides: Dict[str, dict], +): + """ + Write solver overrides to a YAML file. + + Parameters + ---------- + override_path : Path + Output file path. + defaults : dict + Default parameters for all areas. + area_overrides : dict + Per-area overrides (only areas needing special treatment). + """ + data = {"_defaults": defaults} + for area in sorted(area_overrides.keys()): + ovr = area_overrides[area] + if ovr: # skip empty overrides + data[area] = ovr + + override_path.parent.mkdir(parents=True, exist_ok=True) + with open(override_path, "w", encoding="utf-8") as f: + # Write header comment + f.write("# Solver overrides — auto-generated by developer mode\n") + f.write( + "# Edit manually only if automatic relaxation is insufficient\n" + ) + f.write("\n") + yaml.dump( + data, + f, + default_flow_style=False, + sort_keys=False, + allow_unicode=True, + )