Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 273 additions & 0 deletions HARK/SSJutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,258 @@ def make_basic_SSJ_matrices(
return SSJ


def make_flat_LC_SSJ_matrices(
agent,
shock,
outcomes,
grids,
eps=1e-4,
T_max=100,
norm=None,
PopGroFac=1.0,
ProdGroFac=1.0,
solved=False,
construct=True,
verbose=False,
):
"""
Constructs one or more sequence space Jacobian (SSJ) matrices for specified
outcomes over one shock variable. This version of the function is for life-
cycle models with "flat" demographic dynamics: the long run distribution of
ages is stable. This requires that survival probability is not endogenous to
agent actions and thus cannot be affected by shocks.

"Flat" demographic dynamics permit two very specific growth trends: constant
population growth and constant aggregate productivity growth.

This algorithm (and some of the code) are directly taken from Mateo Velasquez
and Bence Bardoczy's paper on life-cycle Jacobians, and its accompanying repo.

Parameters
----------
agent : AgentType
Agent for which the SSJ(s) should be constructed. Must have cycles = 1
or the function will throw an error. Must have a model file defined or
this won't work at all.
shock : str
Name of the variable that Jacobians will be computed with respect to.
It does not need to be a "shock" in a modeling sense, but it must be a
single-valued parameter (possibly a singleton list) that can be changed.
outcomes : str or [str]
Names of outcome variables of interest; an SSJ matrix will be constructed
for each variable named here. If a single string is passed, the output
will be a single np.array. If a list of strings are passed, the output
will be a list of SSJ matrices in the order specified here.
grids : dict
Dictionary of dictionaries with discretizing grid information. The grids
should include all arrival variables other than those that are normalized
out. They should also include all variables named in outcomes, except
outcomes that are continuation variables that remap to arrival variables.
Grid specification must include number of nodes N, should also include
min and max if the variable is continuous.
eps : float
Amount by which to perturb the shock variable. The default is 1e-4.
T_max : int
Size of the SSJ matrices: the maximum number of periods to consider.
The default is 300.
Comment on lines +354 to +356
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring says T_max default is 300, but the signature sets T_max=100. Please reconcile the documented default with the actual default value to avoid confusing callers.

Copilot uses AI. Check for mistakes.
norm : str or None
Name of the model variable to normalize by for Harmenberg aggregation,
if any. For many HARK models, this should be 'PermShk', which enables
the grid over permanent income to be omitted as an explicit state.
PopGroFac : float
Constant population growth factor, defaulting to 1. Each successive
birth cohort is this factor bigger than the prior birth cohort. With flat
demographic dynamics, this results in the entire population growing by
this factor each period as well.
ProdGroFac : float
Constant aggregate productivity growth factor, defaulting to 1. Each
successive birth cohort has permanent labor productivity that is this
factor bigger than the prior cohort. With flat demographic dynamics,
this results in aggregate productivity growing by this factor as well.
solved : bool
Whether the agent's model has already been solved. If False (default),
it will be solved as the very first step. Solving the agent's long run
model before constructing SSJ matrices has the advantage of not needing
to re-solve the long run model for each shock variable.
construct : bool
Whether the construct (update) method should be run after the shock is
updated. The default is True, which is the "safe" option. If the shock
variable is a parameter that enters the model only *directly*, rather
than being used to build a more complex model input, then this can be
set to False to save a (very) small amount of time during computation.
If it is set to False improperly, the SSJs will be very wrong, potentially
just zero everywhere.
verbose : bool
Whether to display timing/progress to screen. The default is False.

Returns
-------
SSJ : np.array or [np.array]
One or more sequence space Jacobian arrays over the outcome variables
with respect to the named shock variable.
"""
if agent.cycles != 1:
raise ValueError("This function is only compatible with life-cycle models!")
if not isinstance(outcomes, list):
outcomes = [outcomes]
no_list = True
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable no_list is not used.

Copilot uses AI. Check for mistakes.
else:
no_list = False
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable no_list is not used.

Copilot uses AI. Check for mistakes.

# Store the simulator if it exists
if hasattr(agent, "_simulator"):
simulator_backup = agent._simulator
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable simulator_backup is not used.

Copilot uses AI. Check for mistakes.

# Make sure the shock variable is age-varying
if shock in agent.time_inv:
temp = getattr(agent, shock)
setattr(agent, shock, agent.T_cycle * [temp])
agent.del_from_time_inv(shock)
agent.add_to_time_vary(shock)

Comment on lines +405 to +411
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function converts a time-invariant shock parameter into a time-varying list (and updates time_inv/time_vary), but there is no corresponding restoration step before returning. That permanently changes the passed-in agent’s parameter structure for callers. Consider snapshotting and restoring the original attribute value and time-inv/time-vary membership on exit (similar to make_basic_SSJ_matrices).

Copilot uses AI. Check for mistakes.
# Solve the long run model if it wasn't already
if not solved:
t0 = time()
agent.solve()
t1 = time()
if verbose:
print(
"Solving the long run model took {:.3f}".format(t1 - t0) + " seconds."
)
LR_soln = deepcopy(agent.solution)

# Construct the transition matrices for the long run model
t0 = time()
agent.initialize_sym()
X = agent._simulator # for easier referencing
X.make_transition_matrices(grids, norm)
LR_trans = deepcopy(X.trans_arrays) # the transition matrices in LR model
T_age = len(LR_trans)
LR_outcomes = []
outcome_grids = []
for var in outcomes:
try:
LR_outcomes.append([X.periods[t].matrices[var] for t in range(T_age)])
outcome_grids.append([X.periods[t].grids[var] for t in range(T_age)])
except:
raise ValueError(
"Outcome " + var + " was requested, but no grid was provided!"
)
t1 = time()
if verbose:
print(
"Making the transition matrix for the long run model took {:.3f}".format(
t1 - t0
)
+ " seconds."
)

# Find the steady state for the long run model
t0 = time()
X.simulate_cohort_by_grids(outcomes=["dead"] + outcomes, calc_dstn=True)
SS_dstn = deepcopy(X.state_dstn_by_age)
SS_outcomes = {}
for j in len(outcomes):
name = outcomes[j]
SS_outcomes[name] = [
np.dot(LR_outcomes[j][t], outcome_grids[j][t]) for t in range(T_age)
]
Comment on lines +454 to +458
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for j in len(outcomes): will raise TypeError: 'int' object is not iterable. Use for j in range(len(outcomes)): (or enumerate(outcomes)) when building SS_outcomes.

Copilot uses AI. Check for mistakes.
survival_by_age = 1.0 - X.history_avg["dead"]
t1 = time()
if verbose:
print(
"Finding the long run steady state took {:.3f}".format(t1 - t0)
+ " seconds."
)

# Generate the steady state distribution of ages; adjust this later for pop growth
SS_age_dstn = np.cumprod(np.concatenate(([1.0], survival_by_age)))
SS_age_dstn /= np.sum(SS_age_dstn)

# Construct the "expectation vectors" for all outcomes at all ages
t0 = time()
E_vecs = {}
for j in len(outcomes):
name = outcomes[j]
E_vecs[name] = [SS_outcomes.copy() for t in range(T_age)]
for t in range(1, T_age):
for a in range(T_age - t):
E_vecs[name][a].append(np.dot(LR_trans[a], E_vecs[a + 1][-1]))
t1 = time()
Comment on lines +473 to +480
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expectation-vector construction has multiple indexing/type errors: for j in len(outcomes) is invalid, SS_outcomes.copy() is a dict (not a vector over states), and E_vecs[a + 1][-1] indexes the dict with an int key. As written this block will fail before producing usable expectation vectors.

Copilot uses AI. Check for mistakes.
if verbose:
print(
"Constructing expectation vectors took {:.3f}".format(t1 - t0) + " seconds."
)

# Each entry of the E_vecs dictionary is a nested list. The outer index of the
# list is a, the age at t=0, and the inner index is time period t. The elements
# in the nested list are expectation vectors: the expected value of the outcome
# in period t conditional on being age a and at state space gridpoint n at t=0.

# Initialize the fake news matrices for each output
J = len(outcomes)
fake_news_array = np.zeros((J, T_age, T_age, T_age))
# Dimensions of fake news arrays:
# dim 0 --> j: index of outcome variable
# dim 1 --> a: age when news arrives
# dim 2 --> t: periods since news arrived
# dim 3 --> s: periods ahead that news arrived

# Loop over ages of the model and have the news shock apply at each one;
# k is the age index at which the shock arrives
t0 = time()
for k in reversed(range(T_age - 1)):
# Perturb the shock variable at age k
shock_val_orig = getattr(agent, shock)[k]
shock_val_new = shock_val_orig + eps
getattr(agent, shock)[k] = shock_val_new

# Solve the model starting from age k
if construct:
agent.update()
agent.solve(from_solution=LR_soln[k + 1], from_t=k + 1)

Comment on lines +503 to +513
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside the age-perturbation loop, the shocked parameter value is never restored (and neither is agent.solution). This means later iterations will compound earlier perturbations and produce incorrect Jacobians; also leaves the passed-in agent mutated after this function finishes.

Copilot uses AI. Check for mistakes.
# Build transitions and outcomes up to age k
agent.initialize_sym()
X = agent._simulator # for easier typing
X.make_transition_matrices(grids, norm, for_t=range(k + 1)) # not FNT!
shocked_trans = deepcopy(X.trans_arrays)
shocked_outcomes = []
for var in outcomes:
temp_outcomes = []
for a in range(k + 1):
temp_outcomes.append(X.periods[a].matrices[var])
shocked_outcomes.append(temp_outcomes)

# Update the t=0 row of the fake news matrices
for j in range(J):
for a in range(k + 1):
fake_news_array[j, a, 0, k - a] += np.sum(
(shocked_outcomes[j][a] - LR_outcomes[j][a]) * SS_dstn[a]
)
Comment on lines +526 to +531
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The t=0 fake-news update is aggregating (shocked_outcomes - LR_outcomes) * SS_dstn but does not apply the outcome grid. Since X.periods[a].matrices[var] is an outcome-mapping matrix, this computes a sum of weighted matrix entries rather than the change in the average outcome. The aggregation should incorporate outcome_grids similarly to calc_derivs_of_policy_funcs.

Copilot uses AI. Check for mistakes.

# Update the other t rows of the fake news matrices
for a in range(k + 1):
D_dstn_news = (
np.dot(shocked_trans[a], SS_dstn[a]) - SS_dstn[a + 1]
).flatten()
update_FN_mats(fake_news_array, E_vecs[a + 1], D_dstn_news, T_age - 1, a, k)
Comment on lines +534 to +538
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update_FN_mats is @njit, so the evecs argument must be a NumPy array with a Numba-compatible dtype/shape. In the current caller, evecs comes from E_vecs[...], which is built as nested Python lists/dicts above; this will fail compilation when executed. Consider constructing E_vecs as a single NumPy array (e.g., shape (T, J, K)), and pass a slice of that array here.

Copilot uses AI. Check for mistakes.
t1 = time()
if verbose:
print(
"Perturbing each period of the problem took {:.3f}".format(t1 - t0)
+ " seconds."
)

# Normalize everything by the shock size and pad with 0s to the time horizon
fake_news_array /= eps

# Construct the (aggregate) SSJ object

# Structure and return outputs
return None
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

T_max, PopGroFac, and ProdGroFac are currently unused, and the function returns None despite the docstring promising SSJ matrices. If this is intentionally incomplete/WIP, consider raising NotImplementedError (or removing the public function) to avoid silent misuse.

Suggested change
return None
raise NotImplementedError(
"make_basic_SSJ_matrices is not yet implemented: it currently does not "
"construct or return SSJ matrices. Please implement or remove this "
"function before using it."
)

Copilot uses AI. Check for mistakes.


def calc_shock_response_manually(
agent,
shock,
Expand Down Expand Up @@ -692,3 +944,24 @@ def calc_ssj_from_fake_news_matrices(T, J, FN, dx): # pragma: no cover
SSJ[:, t, s] = SSJ[:, t - 1, s - 1] + FN[:, t, s]
SSJ *= dx**-1.0 # Scale by dx
return SSJ


@njit
def update_FN_mats(FN_mats, evecs, dD1, A, a, k):
"""
This is copy-pasted from Mateo's code.

Fn_mats: (J, A, A, A)
evecs : (T, n_out, G)
dD1 : (G,)
"""
n_out = FN_mats.shape[0]
G = dD1.shape[0]

for oi in range(n_out):
for t in range(1, A - a):
# compute dot(evecs[t-1, oi, :], dD1) by hand
s = 0.0
for g in range(G):
s += evecs[t - 1, oi, g] * dD1[g]
FN_mats[oi, a + t, t, k - a] += s
Loading