From e81658818bd1efc7393df4076643c3d67550fd41 Mon Sep 17 00:00:00 2001 From: mgabyurango Date: Mon, 14 Apr 2025 11:56:17 -0500 Subject: [PATCH] add on absorbtion methos for dtmc --- examples/simple_ctmc.py | 7 +++++- examples/simple_dtmc.py | 19 +++++++++++++- jmarkov/ctmc.py | 19 +++++++++++++- jmarkov/dtmc.py | 56 +++++++++++++++++++++++++++++++++++++++-- tests/tests_ctmc.py | 6 +++++ tests/tests_dtmc.py | 21 ++++++++++++++++ 6 files changed, 123 insertions(+), 5 deletions(-) diff --git a/examples/simple_ctmc.py b/examples/simple_ctmc.py index 5e0bd40..0968071 100644 --- a/examples/simple_ctmc.py +++ b/examples/simple_ctmc.py @@ -44,4 +44,9 @@ for j in range(2): print(f'The mean time of being in state {j} given that it started in state {i}, before absorbing is:') print(A.absorbtion_times(start=i,target=j)) - \ No newline at end of file + + +print("Let's try some absorbing probabilities") +for i in range(2): + print(f'The probability of being absorbed by state {j} given that it started in state {i} is:') + print(A.absorbtion_probabilities(target=2,start=i)) \ No newline at end of file diff --git a/examples/simple_dtmc.py b/examples/simple_dtmc.py index 36c0930..c41562c 100644 --- a/examples/simple_dtmc.py +++ b/examples/simple_dtmc.py @@ -128,4 +128,21 @@ print(O.first_passage_time(1)) print("lets try an error creating a dtcm") -error_mc=dtmc(np.array([[0.5,0.5],[0.7,0.7]])) \ No newline at end of file +#error_mc=dtmc(np.array([[0.5,0.5],[0.7,0.7]])) + +print("let's try the methods for absorbing chains") + +abs_mc = dtmc(np.array([[0.2,0.3,0.1,0.4], + [0.1,0.2,0.5,0.2], + [0,0,1,0], + [0,0,0,1]]), states = [0,1,2,3]) + +for i in range(0,2): + for j in range(0,2): + print(f'The mean time of being in state {j} given that it started in state {i}, before absorbing is:') + print(abs_mc.absorbtion_times(target=j, start=i)) + +for i in range(0,2): + for j in range(2,4): + print(f'The probability of being absorbed by state {j} given that it started in state {i} is:') + print(abs_mc.absorbtion_probabilities(target=j, start=i)) \ No newline at end of file diff --git a/jmarkov/ctmc.py b/jmarkov/ctmc.py index 875e8b1..266377f 100644 --- a/jmarkov/ctmc.py +++ b/jmarkov/ctmc.py @@ -208,7 +208,7 @@ def is_ergodic(self): else: return False - def absorbtion_times(self,target:str,start=None): + def absorbtion_times(self,target:int,start=None): """ Computes the mean time spent in state target before absorption, starting from state start. @@ -240,3 +240,20 @@ def absorbtion_times(self,target:str,start=None): else: return "start and target should be transient" + def absorbtion_probabilities(self, target:int, start=None): + """ + Computes the probability of being absorbed by state target, starting from state start. + + The chain must not be ergodic for the calculation to make sense. + """ + if not self.is_ergodic(): + U = self.generator + absorbing_states = np.where(np.all(U == 0, axis=1))[0] + transient_states = np.setdiff1d(np.arange(U.shape[0]), absorbing_states) + if np.isin(target, absorbing_states).all() and np.isin(start, transient_states).all(): + U_ = U[np.ix_(transient_states, transient_states)] + V = U[np.ix_(transient_states, absorbing_states)] + mat_Probs = np.matmul(np.linalg.inv(-U_),V) + return mat_Probs[start,np.where(absorbing_states==target)] + else: + return "the chain shouldn't be ergodic" diff --git a/jmarkov/dtmc.py b/jmarkov/dtmc.py index 98d377a..55af97e 100644 --- a/jmarkov/dtmc.py +++ b/jmarkov/dtmc.py @@ -38,11 +38,12 @@ def __init__(self,n:int): self.transition_matrix = np.eye(n, dtype=float) # initializer with a transition matrix - def __init__(self,transition_matrix:np.array): + def __init__(self,transition_matrix:np.array,states:np.array=[1]): if not self._check_transition_matrix(transition_matrix):#Lets check if transition matrix is logical (i.e the rows sum 1) raise ValueError("the rows of transition matrix do not sum 1 or has non positive values") self.n_states=transition_matrix.shape[0] self.transition_matrix = transition_matrix + self.states = states def _check_transition_matrix(self, M:np.array): """ @@ -211,7 +212,58 @@ def is_ergodic(self): return True else: return False - + + def absorbtion_times(self,target:int,start=None): + """ + Computes the mean time spent in state target before absorption, starting from state start. + + If the chain is absorbing, both start and target must be transient. + If the chain is ergodic, this is the same as the mean first passage time + to state target, starting from state start. + """ + P = self.transition_matrix + + if self.is_ergodic(): + Q = np.delete(P, target, axis=0) + Q = np.delete(Q, target, axis=1) + # check that start is a transient state (must be different than target) + if start != target: + I = np.eye(Q.shape[0]) + mat_Abs = np.linalg.inv(I-Q) + return mat_Abs[start,] + else: + return "start should be different than target" + else: + # check that both start and end states are transient + absorbing_states = np.where( (np.sum(P == 1, axis=1) == 1) & (np.all((P == 0) | (P == 1), axis=1)) )[0] + transient_states = np.setdiff1d(np.arange(P.shape[0]), absorbing_states) + if np.isin(target, transient_states).all() and np.isin(start, transient_states).all(): + Q = P[np.ix_(transient_states, transient_states)] + I = np.eye(len(transient_states)) + mat_Abs = np.linalg.inv(I-Q) + return mat_Abs[start,target] + else: + return "start and target should be transient" + + def absorbtion_probabilities(self, target:int, start=None): + """ + Computes the probability of being absorbed by state target, starting from state start. + + The chain must not be ergodic for the calculation to make sense. + """ + if not self.is_ergodic(): + P = self.transition_matrix + absorbing_states = np.where( (np.sum(P == 1, axis=1) == 1) & (np.all((P == 0) | (P == 1), axis=1)) )[0] + transient_states = np.setdiff1d(np.arange(P.shape[0]), absorbing_states) + if np.isin(target, absorbing_states).all() and np.isin(start, transient_states).all(): + Q = P[np.ix_(transient_states, transient_states)] + I = np.eye(len(transient_states)) + R = P[np.ix_(transient_states, absorbing_states)] + mat_Probs = np.matmul(np.linalg.inv(I-Q),R) + return mat_Probs[start,np.where(absorbing_states==target)] + else: + return "the chain shouldn't be ergodic" + diff --git a/tests/tests_ctmc.py b/tests/tests_ctmc.py index 0bee633..ae21056 100644 --- a/tests/tests_ctmc.py +++ b/tests/tests_ctmc.py @@ -33,5 +33,11 @@ def test_absorbtion_times(self): mc = ctmc(Q, states) self.assertAlmostEqual(mc.absorbtion_times(target=0,start=1)[0][0],0.1111111) +class TestAbsorbtionProbabilities(unittest.TestCase): + def test_absortion_probabilities(self): + A = ctmc(np.array([[-5, 1, 3,1], [3, -10, 3,4], [0, 0, 0,0],[0,0,0,0]]), + states = np.array([0,1,2,3])) + self.assertAlmostEqual(A.absorbtion_probabilities(target=2,start=0)[0][0], 0.702127659574468) + if __name__ == '__main__': unittest.main() \ No newline at end of file diff --git a/tests/tests_dtmc.py b/tests/tests_dtmc.py index 1c5e2b4..a7fcb6e 100644 --- a/tests/tests_dtmc.py +++ b/tests/tests_dtmc.py @@ -76,5 +76,26 @@ def test_dtmc_isergodic_false(self): mc = dtmc(P) self.assertFalse(mc.is_ergodic()) +class TestFirstPassageTime(unittest.TestCase): + def test_dtmc_first_passage_time(self): + P = np.array([[0.8,0.2,0],[0,0.2,0.8],[0.8,0.2,0]]) + mc = dtmc(P) + assert_allclose(mc.first_passage_time(0), [[2.8125], [1.5625]], err_msg="should be [[2.8125], [1.5625]]") + +class TestAbsorbtionTimes(unittest.TestCase): + def test_absorbtion_times(self): + P = np.array([[0.2, 0.3, 0.1, 0.4], [0.1, 0.2, 0.5, 0.2], [0,0,1,0], [0,0,0,1]]) + states = np.array([0,1,2]) + mc = dtmc(P, states) + self.assertAlmostEqual(mc.absorbtion_times(target=0,start=0),1.31147540983607) + +class TestAbsorbtionProbabilities(unittest.TestCase): + def test_absortion_probabilities(self): + P = np.array([[0.2, 0.3, 0.1, 0.4], [0.1, 0.2, 0.5, 0.2], [0,0,1,0], [0,0,0,1]]) + states = np.array([0,1,2]) + mc = dtmc(P, states) + self.assertAlmostEqual(mc.absorbtion_probabilities(target=3,start=0)[0][0], 0.622950819672131) + + if __name__ == '__main__': unittest.main()