Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
317 changes: 313 additions & 4 deletions HARK/ConsumptionSaving/ConsMarkovModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@
MargValueFuncCRRA,
ValueFuncCRRA,
)


import scipy.sparse as sp
from HARK.utilities import(

jump_to_grid_1D,
jump_to_grid_2D,
gen_tran_matrix_1D,
gen_tran_matrix_2D,
)

from copy import deepcopy

from HARK.rewards import (
CRRAutility,
CRRAutility_inv,
Expand Down Expand Up @@ -866,10 +879,22 @@ def check_markov_inputs(self):
StateCount = self.MrkvArray[0].shape[0]

# Check that arrays are the right shape
if not isinstance(self.Rfree, np.ndarray) or self.Rfree.shape != (StateCount,):
raise ValueError(
"Rfree not the right shape, it should an array of Rfree of all the states."
)
if isinstance(self.Rfree,list):

for rfree_t in self.Rfree:

# Check that arrays are the right shape
if not isinstance(rfree_t, np.ndarray) or rfree_t.shape != (StateCount,):
raise ValueError(
"Rfree not the right shape, it should an array of Rfree of all the states."
)
else:

# Check that arrays are the right shape
if not isinstance(self.Rfree, np.ndarray) or self.Rfree.shape != (StateCount,):
raise ValueError(
"Rfree not the right shape, it should an array of Rfree of all the states."
)
Comment on lines +882 to +897
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check_markov_inputs now accepts self.Rfree as a list of per-period arrays, but other methods (e.g., get_Rfree) still assume self.Rfree is a single NumPy array indexed directly by the Markov state. As a result, passing a list for Rfree will pass validation here but later fail or behave incorrectly when used, so either the rest of the class needs to be generalized to handle list-valued Rfree or this method should continue to enforce an array-only contract.

Suggested change
if isinstance(self.Rfree,list):
for rfree_t in self.Rfree:
# Check that arrays are the right shape
if not isinstance(rfree_t, np.ndarray) or rfree_t.shape != (StateCount,):
raise ValueError(
"Rfree not the right shape, it should an array of Rfree of all the states."
)
else:
# Check that arrays are the right shape
if not isinstance(self.Rfree, np.ndarray) or self.Rfree.shape != (StateCount,):
raise ValueError(
"Rfree not the right shape, it should an array of Rfree of all the states."
)
if not isinstance(self.Rfree, np.ndarray) or self.Rfree.shape != (StateCount,):
raise ValueError(
"Rfree not the right shape, it should be a NumPy array of Rfree for all the states."
)

Copilot uses AI. Check for mistakes.

# Check that arrays in lists are the right shape
for MrkvArray_t in self.MrkvArray:
Expand Down Expand Up @@ -1209,6 +1234,290 @@ def calc_bounding_values(self):
"""
raise NotImplementedError()



def calc_transition_matrix(self, shk_dstn = None):
'''
Calculates how the distribution of agents across market resources
transitions from one period to the next. If finite horizon problem, then calculates
a list of transition matrices, consumption and asset policy grids for each period of the problem.
The transition matrix/matrices and consumption and asset policy grid(s) are stored as attributes of self.


Parameters
----------
shk_dstn: list
list of income shock distributions

Returns
-------
None

'''

self.state_num = len(self.MrkvArray[0])

if self.cycles == 0:

markov_array = self.MrkvArray

eigen, ss_dstn = sp.linalg.eigs(markov_array[0].T , k=1, which='LM')


ss_dstn = ss_dstn[:,0] / np.sum(ss_dstn[:,0]) # Steady state distribution of employed/unemployed

states = len(ss_dstn)



if shk_dstn == None:
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing for None should use the 'is' operator.

Suggested change
if shk_dstn == None:
if shk_dstn is None:

Copilot uses AI. Check for mistakes.
shk_dstn = self.IncShkDstn[0]

dist_mGrid = self.dist_mGrid #Grid of market resources
dist_pGrid = self.dist_pGrid #Grid of permanent incomes

self.cPol_Grid = []
self.aPol_Grid = []

bNext = []
shk_prbs = []
tran_shks = []
perm_shks = []

for i in range(states):
c_next = self.solution[0].cFunc[i](dist_mGrid)
self.cPol_Grid.append(c_next)

a_next_i = dist_mGrid - c_next
self.aPol_Grid .append(a_next_i)

if type(self.Rfree) == list:
b_next_i = self.Rfree[0][0] * a_next_i
else:
b_next_i = self.Rfree * a_next_i
Comment on lines +1294 to +1297
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the infinite-horizon branch of calc_transition_matrix, b_next_i is computed as self.Rfree * a_next_i when self.Rfree is a NumPy array of per-state interest factors, which produces a 2D array (state_count × m_grid_size). gen_tran_matrix_1D/gen_tran_matrix_2D expect a 1D bNext vector for a single current state, and you also ignore the dependence of Rfree on the current Markov state; this will either raise shape errors or generate incorrect transitions. You likely want to use the interest factor for state i only (e.g., a scalar extracted from self.Rfree) so that b_next_i has the same shape as a_next_i.

Suggested change
if type(self.Rfree) == list:
b_next_i = self.Rfree[0][0] * a_next_i
else:
b_next_i = self.Rfree * a_next_i
# Determine the appropriate interest factor for Markov state i
if isinstance(self.Rfree, list):
Rfree_source = self.Rfree[0]
else:
Rfree_source = self.Rfree
Rfree_array = np.asarray(Rfree_source)
if Rfree_array.ndim == 0:
R_i = Rfree_array
else:
R_i = Rfree_array[i]
b_next_i = R_i * a_next_i

Copilot uses AI. Check for mistakes.

bNext.append(b_next_i)

shk_prbs.append(shk_dstn[i].pmv)
tran_shks.append(shk_dstn[i].atoms[1])
perm_shks.append(shk_dstn[i].atoms[0])



LivPrb = self.LivPrb[0][0] # Update probability of staying alive
Comment on lines +1305 to +1307
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the infinite-horizon branch, the survival probability used in the transition matrices is hard-coded as self.LivPrb[0][0], i.e., the survival probability of the first Markov state only. Since self.LivPrb is validated above as a vector of length StateCount, this effectively ignores any heterogeneity in survival probabilities across Markov states; consider indexing by i when constructing the transition matrix if state-specific survival is intended.

Copilot uses AI. Check for mistakes.


if len(dist_pGrid) == 1:

self.tran_matrix = []

for i in range(self.state_num):

TranMatrix_i = np.zeros( (len(dist_mGrid),len(dist_mGrid)) )

for j in range(self.state_num):
NewBornDist = jump_to_grid_1D(tran_shks[j],shk_prbs[j],dist_mGrid)

TranMatrix_i += gen_tran_matrix_1D(dist_mGrid,bNext[i],self.MrkvArray[0][i][j]*shk_prbs[j],perm_shks[j],tran_shks[j], LivPrb,NewBornDist)
self.tran_matrix.append(TranMatrix_i)

self.prb_dstn = ss_dstn.real


else:

self.tran_matrix = []

for i in range(self.state_num):

TranMatrix_i = np.zeros( (len(dist_mGrid),len(dist_mGrid)) )

for j in range(self.state_num):
NewBornDist = jump_to_grid_2D(tran_shks[j],np.ones_like(tran_shks[i]),shk_prbs[j],dist_mGrid,dist_pGrid)

TranMatrix_i += gen_tran_matrix_2D(dist_mGrid,dist_pGrid,bNext[i],self.MrkvArray[0][i][j]*shk_prbs[j],perm_shks[j],tran_shks[j], LivPrb,NewBornDist)
self.tran_matrix.append(TranMatrix_i)
Comment on lines +1331 to +1339
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When len(dist_pGrid) > 1, TranMatrix_i is initialized with shape (len(dist_mGrid), len(dist_mGrid)) but gen_tran_matrix_2D returns a matrix of shape (len(dist_mGrid) * len(dist_pGrid), len(dist_mGrid) * len(dist_pGrid)). Adding these together will fail due to incompatible shapes; TranMatrix_i should be initialized with the same 2D shape as the output of gen_tran_matrix_2D (or you should avoid preallocating with the wrong dimensions).

Copilot uses AI. Check for mistakes.

self.prb_dstn = ss_dstn.real


elif self.cycles > 1:
print('calc_transition_matrix requires cycles = 0 or cycles = 1')

elif self.T_cycle!= 0:

# for finite horizon, we can account for changing levels of prb_unemp because of endogenous job finding probability by imposing a list of these values, so for q'th period, the probability is slightly different
if shk_dstn == None:
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing for None should use the 'is' operator.

Suggested change
if shk_dstn == None:
if shk_dstn is None:

Copilot uses AI. Check for mistakes.
shk_dstn = self.IncShkDstn

self.tran_matrix = []

dist_mGrid = self.dist_mGrid
#print(len(dist_mGrid))

self.cPol_Grid = []
self.aPol_Grid = []
self.prb_dstn = []
dstn_0 = self.dstn_0

for k in range(self.T_cycle):

if type(self.dist_pGrid) == list:
dist_pGrid = self.dist_pGrid[k] #Permanent income grid this period
else:
dist_pGrid = self.dist_pGrid #If here then use prespecified permanent income grid

bNext = []
shk_prbs = []
tran_shks = []
perm_shks = []
cPol_Grid_k = []
aPol_Grid_k = []

for i in range(self.state_num):

c_next = self.solution[k].cFunc[i](dist_mGrid)
cPol_Grid_k.append(c_next)

a_next_i = dist_mGrid - c_next
aPol_Grid_k.append(a_next_i)

if type(self.Rfree)==list:
b_next_i = self.Rfree[k][0]*a_next_i
bNext.append(b_next_i)
else:
b_next_i = self.Rfree*a_next_i
bNext.append(b_next_i)
Comment on lines +1385 to +1390
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the finite-horizon branch of calc_transition_matrix, b_next_i is computed using either self.Rfree[k][0] * a_next_i or self.Rfree * a_next_i. This both ignores the dependence of the interest factor on the current Markov state (only the first entry is ever used from each period’s vector) and, when self.Rfree is a NumPy array of per-state rates, produces a 2D array that is inconsistent with the 1D bNext expected by gen_tran_matrix_1D/gen_tran_matrix_2D. The computation of b_next_i should use the scalar interest factor corresponding to state i in period k so that bNext[i] has shape matching a_next_i.

Suggested change
if type(self.Rfree)==list:
b_next_i = self.Rfree[k][0]*a_next_i
bNext.append(b_next_i)
else:
b_next_i = self.Rfree*a_next_i
bNext.append(b_next_i)
if type(self.Rfree) == list:
R_k = self.Rfree[k]
# Allow for either scalar or per-state array at period k
if np.ndim(R_k) == 0:
R_ki = R_k
else:
R_ki = R_k[i]
else:
# Allow for scalar or 1-D array of per-state rates
if np.ndim(self.Rfree) == 0:
R_ki = self.Rfree
else:
R_ki = self.Rfree[i]
b_next_i = R_ki * a_next_i
bNext.append(b_next_i)

Copilot uses AI. Check for mistakes.


shk_prbs.append(shk_dstn[k][i].pmv)
tran_shks.append(shk_dstn[k][i].atoms[1])
perm_shks.append(shk_dstn[k][i].atoms[0])


self.cPol_Grid.append(cPol_Grid_k)
#print(len(cPol_Grid_k))

self.aPol_Grid.append(aPol_Grid_k)


LivPrb = self.LivPrb[k][0] # Update probability of staying alive this period


if len(dist_pGrid) == 1:

dstn_0 = np.dot(self.MrkvArray[k].T, dstn_0) #transposed to have columns sum up to one , I think this is the distribution of employed vs unemployed

tran_matrix_t = []

for i in range(self.state_num):

TranMatrix_i = np.zeros( (len(dist_mGrid),len(dist_mGrid)) )

for j in range(self.state_num):

NewBornDist = jump_to_grid_1D(tran_shks[j],self.MrkvArray[k][i][j]*shk_prbs[j],dist_mGrid)

TranMatrix_i += gen_tran_matrix_1D(dist_mGrid,bNext[i],self.MrkvArray[k][i][j]*shk_prbs[j],perm_shks[j],tran_shks[j], LivPrb,NewBornDist)
tran_matrix_t.append(TranMatrix_i)

self.tran_matrix.append(deepcopy(tran_matrix_t))

self.prb_dstn.append(dstn_0)


else:


dstn_0 = np.dot(self.MrkvArray[k].T, dstn_0) #transposed to have columns sum up to one , I think this is the distribution of employed vs unemployed

tran_matrix_t = []

for i in range(self.state_num):

TranMatrix_i = np.zeros( (len(dist_mGrid),len(dist_mGrid)) )

for j in range(self.state_num):

NewBornDist = jump_to_grid_2D(tran_shks[j],np.ones_like(tran_shks[i]),self.MrkvArray[k][i][j]*shk_prbs[j],dist_mGrid,dist_pGrid)

TranMatrix_i += gen_tran_matrix_2D(dist_mGrid,dist_pGrid,bNext[i],self.MrkvArray[k][i][j]*shk_prbs[j],perm_shks[j],tran_shks[j], LivPrb,NewBornDist)
tran_matrix_t.append(TranMatrix_i)

Comment on lines +1437 to +1446
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the finite-horizon, two-dimensional case (len(dist_pGrid) > 1), TranMatrix_i is created with shape (len(dist_mGrid), len(dist_mGrid)) but is incremented by the result of gen_tran_matrix_2D, which has shape (len(dist_mGrid) * len(dist_pGrid), len(dist_mGrid) * len(dist_pGrid)). This shape mismatch will raise errors at runtime; the preallocated TranMatrix_i needs to match the size of the transition matrices produced by gen_tran_matrix_2D.

Suggested change
TranMatrix_i = np.zeros( (len(dist_mGrid),len(dist_mGrid)) )
for j in range(self.state_num):
NewBornDist = jump_to_grid_2D(tran_shks[j],np.ones_like(tran_shks[i]),self.MrkvArray[k][i][j]*shk_prbs[j],dist_mGrid,dist_pGrid)
TranMatrix_i += gen_tran_matrix_2D(dist_mGrid,dist_pGrid,bNext[i],self.MrkvArray[k][i][j]*shk_prbs[j],perm_shks[j],tran_shks[j], LivPrb,NewBornDist)
tran_matrix_t.append(TranMatrix_i)
TranMatrix_i = np.zeros(
(
len(dist_mGrid) * len(dist_pGrid),
len(dist_mGrid) * len(dist_pGrid),
)
)
for j in range(self.state_num):
NewBornDist = jump_to_grid_2D(
tran_shks[j],
np.ones_like(tran_shks[i]),
self.MrkvArray[k][i][j] * shk_prbs[j],
dist_mGrid,
dist_pGrid,
)
TranMatrix_i += gen_tran_matrix_2D(
dist_mGrid,
dist_pGrid,
bNext[i],
self.MrkvArray[k][i][j] * shk_prbs[j],
perm_shks[j],
tran_shks[j],
LivPrb,
NewBornDist,
)
tran_matrix_t.append(TranMatrix_i)

Copilot uses AI. Check for mistakes.
self.tran_matrix.append(deepcopy(tran_matrix_t))

self.prb_dstn.append(dstn_0)







def calc_ergodic_dist(self, transition_matrix = None):

'''
Calculates the ergodic distribution across normalized market resources and
permanent income as the eigenvector associated with the eigenvalue 1.
The distribution is stored as attributes of self both as a vector and as a reshaped array with the ij'th element representing
the probability of being at the i'th point on the mGrid and the j'th
point on the pGrid.

Parameters
----------
transition_matrix: List
transition matrix whose ergordic distribution is to be solved
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring for calc_ergodic_dist describes the parameter as a "transition matrix whose ergordic distribution is to be solved"; "ergordic" is misspelled and should be "ergodic".

Suggested change
transition matrix whose ergordic distribution is to be solved
transition matrix whose ergodic distribution is to be solved

Copilot uses AI. Check for mistakes.

Returns
-------
None
'''


#if transition_matrix == None:
#transition_matrix = [self.tran_matrix]
Comment on lines +1477 to +1478
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment appears to contain commented-out code.

Suggested change
#if transition_matrix == None:
#transition_matrix = [self.tran_matrix]

Copilot uses AI. Check for mistakes.
self.vec_erg_dstns = []
for i in range(self.state_num):

eigen_i, ergodic_distr_i = sp.linalg.eigs(self.tran_matrix[i] , k=1 , which='LM') # Solve for ergodic distribution
ergodic_distr_i = ergodic_distr_i.real/np.sum(ergodic_distr_i.real)

#erg_dstn = compute_erg_dstn(self.tran_matrix[i])
#self.ergodic_distrs.append(erg_dstn)

ergodic_distr_i = ergodic_distr_i.T[0]
self.vec_erg_dstns.append(ergodic_distr_i)



Comment on lines +1469 to +1492
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calc_ergodic_dist takes a transition_matrix argument in its signature and docstring but ignores it and always operates on self.tran_matrix[i]. Moreover, when self.T_cycle != 0, self.tran_matrix is built as a list of lists (per period and per state), so self.tran_matrix[i] will be a list rather than a NumPy array and sp.linalg.eigs will fail; either restrict this method to the infinite-horizon layout or generalize it (and the data structure) to support the finite-horizon case, and align the implementation with the documented parameter usage.

Suggested change
transition matrix whose ergordic distribution is to be solved
Returns
-------
None
'''
#if transition_matrix == None:
#transition_matrix = [self.tran_matrix]
self.vec_erg_dstns = []
for i in range(self.state_num):
eigen_i, ergodic_distr_i = sp.linalg.eigs(self.tran_matrix[i] , k=1 , which='LM') # Solve for ergodic distribution
ergodic_distr_i = ergodic_distr_i.real/np.sum(ergodic_distr_i.real)
#erg_dstn = compute_erg_dstn(self.tran_matrix[i])
#self.ergodic_distrs.append(erg_dstn)
ergodic_distr_i = ergodic_distr_i.T[0]
self.vec_erg_dstns.append(ergodic_distr_i)
transition matrix whose ergordic distribution is to be solved.
If None, uses self.tran_matrix (infinite-horizon layout).
Returns
-------
None
'''
# Determine which transition matrix collection to use
if transition_matrix is None:
transition_matrix = self.tran_matrix
# Basic validation: must be an indexable collection of 2-D matrices
try:
num_blocks = len(transition_matrix)
except TypeError:
raise TypeError(
"transition_matrix must be an indexable collection of 2-D transition matrices."
)
# Detect and explicitly reject the finite-horizon nested layout when using self.tran_matrix
if transition_matrix is self.tran_matrix:
if (
num_blocks > 0
and isinstance(transition_matrix[0], list)
and len(transition_matrix[0]) > 0
and isinstance(transition_matrix[0][0], list)
):
raise NotImplementedError(
"calc_ergodic_dist is only implemented for the infinite-horizon "
"layout. For finite-horizon problems, please supply a flat list "
"of 2-D transition matrices via the transition_matrix argument."
)
self.vec_erg_dstns = []
for i in range(num_blocks):
mat_i = transition_matrix[i]
# If provided as a (nested) list, convert to a NumPy array
if not isinstance(mat_i, (np.ndarray, sp.spmatrix)):
mat_i = np.asarray(mat_i)
if mat_i.ndim != 2:
raise ValueError(
"Each element of transition_matrix must be a 2-D square matrix."
)
# Convert dense matrices to sparse format for eigs
if not isinstance(mat_i, sp.spmatrix):
mat_i = sp.csr_matrix(mat_i)
# Solve for ergodic distribution
eigen_i, ergodic_distr_i = sp.linalg.eigs(mat_i, k=1, which='LM')
ergodic_distr_i = ergodic_distr_i.real / np.sum(ergodic_distr_i.real)
ergodic_distr_i = ergodic_distr_i.T[0]
self.vec_erg_dstns.append(ergodic_distr_i)

Copilot uses AI. Check for mistakes.
def compute_steady_state(self,IncShkDstn_ntrl_msr):



Comment on lines +1493 to +1496
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method requires 2 positional arguments, whereas overridden IndShockConsumerType.compute_steady_state requires 1.

Suggested change
def compute_steady_state(self,IncShkDstn_ntrl_msr):
def compute_steady_state(self, IncShkDstn_ntrl_msr=None):
# Preserve previous behavior when the argument is omitted by
# explicitly raising a TypeError, similar to Python's default message.
if IncShkDstn_ntrl_msr is None:
raise TypeError(
"compute_steady_state() missing required positional argument "
"'IncShkDstn_ntrl_msr'"
)

Copilot uses AI. Check for mistakes.

#solve the consumer's problem

self.solve()

self.define_distribution_grid(dist_pGrid = np.array([1]))
self.calc_transition_matrix(IncShkDstn_ntrl_msr)
self.calc_ergodic_dist()


C = 0
A = 0
for i in range(self.state_num):

C += self.prb_dstn[i]*np.dot(self.cPol_Grid[i],self.vec_erg_dstns[i])
A += self.prb_dstn[i]*np.dot(self.aPol_Grid[i],self.vec_erg_dstns[i])


self.A_ss = A
self.C_ss = C

return C , A
Comment on lines +1493 to +1518
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compute_steady_state for MarkovConsumerType returns (C, A) while the analogous compute_steady_state method for IndShockConsumerType returns (A_ss, C_ss). This inconsistency in return order between closely related types makes it easy to misinterpret results when swapping models; consider standardizing the return tuple (and documenting it) so both classes follow the same convention.

Copilot uses AI. Check for mistakes.
Comment on lines +1239 to +1518
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new methods calc_transition_matrix, calc_ergodic_dist, and compute_steady_state for MarkovConsumerType introduce non-trivial new behavior but currently have no dedicated tests, unlike the analogous transition-matrix and Jacobian functionality for IndShockConsumerType which is covered in test_IndShockConsumerType.py. Adding unit tests that exercise these Markov-specific steady-state and transition-matrix computations would help catch the kinds of shape and eigenvalue issues present here and guard against regressions.

Copilot uses AI. Check for mistakes.


def make_euler_error_func(self, mMax=100, approx_inc_dstn=True):
"""
Creates a "normalized Euler error" function for this instance, mapping
Expand Down
Loading
Loading