Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion examples/simple_ctmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@
for j in range(2):
print(f'The mean time of being in state {j} given that it started in state {i}, before absorbing is:')
print(A.absorbtion_times(start=i,target=j))



print("Let's try some absorbing probabilities")
for i in range(2):
print(f'The probability of being absorbed by state {j} given that it started in state {i} is:')
print(A.absorbtion_probabilities(target=2,start=i))
19 changes: 18 additions & 1 deletion examples/simple_dtmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,21 @@
print(O.first_passage_time(1))

print("lets try an error creating a dtcm")
error_mc=dtmc(np.array([[0.5,0.5],[0.7,0.7]]))
#error_mc=dtmc(np.array([[0.5,0.5],[0.7,0.7]]))

print("let's try the methods for absorbing chains")

abs_mc = dtmc(np.array([[0.2,0.3,0.1,0.4],
[0.1,0.2,0.5,0.2],
[0,0,1,0],
[0,0,0,1]]), states = [0,1,2,3])

for i in range(0,2):
for j in range(0,2):
print(f'The mean time of being in state {j} given that it started in state {i}, before absorbing is:')
print(abs_mc.absorbtion_times(target=j, start=i))

for i in range(0,2):
for j in range(2,4):
print(f'The probability of being absorbed by state {j} given that it started in state {i} is:')
print(abs_mc.absorbtion_probabilities(target=j, start=i))
19 changes: 18 additions & 1 deletion jmarkov/ctmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def is_ergodic(self):
else:
return False

def absorbtion_times(self,target:str,start=None):
def absorbtion_times(self,target:int,start=None):
"""
Computes the mean time spent in state target before absorption, starting from state start.

Expand Down Expand Up @@ -240,3 +240,20 @@ def absorbtion_times(self,target:str,start=None):
else:
return "start and target should be transient"

def absorbtion_probabilities(self, target:int, start=None):
"""
Computes the probability of being absorbed by state target, starting from state start.

The chain must not be ergodic for the calculation to make sense.
"""
if not self.is_ergodic():
U = self.generator
absorbing_states = np.where(np.all(U == 0, axis=1))[0]
transient_states = np.setdiff1d(np.arange(U.shape[0]), absorbing_states)
if np.isin(target, absorbing_states).all() and np.isin(start, transient_states).all():
U_ = U[np.ix_(transient_states, transient_states)]
V = U[np.ix_(transient_states, absorbing_states)]
mat_Probs = np.matmul(np.linalg.inv(-U_),V)
return mat_Probs[start,np.where(absorbing_states==target)]
else:
return "the chain shouldn't be ergodic"
56 changes: 54 additions & 2 deletions jmarkov/dtmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ def __init__(self,n:int):
self.transition_matrix = np.eye(n, dtype=float)

# initializer with a transition matrix
def __init__(self,transition_matrix:np.array):
def __init__(self,transition_matrix:np.array,states:np.array=[1]):
if not self._check_transition_matrix(transition_matrix):#Lets check if transition matrix is logical (i.e the rows sum 1)
raise ValueError("the rows of transition matrix do not sum 1 or has non positive values")
self.n_states=transition_matrix.shape[0]
self.transition_matrix = transition_matrix
self.states = states

def _check_transition_matrix(self, M:np.array):
"""
Expand Down Expand Up @@ -211,7 +212,58 @@ def is_ergodic(self):
return True
else:
return False


def absorbtion_times(self,target:int,start=None):
"""
Computes the mean time spent in state target before absorption, starting from state start.

If the chain is absorbing, both start and target must be transient.
If the chain is ergodic, this is the same as the mean first passage time
to state target, starting from state start.
"""
P = self.transition_matrix

if self.is_ergodic():
Q = np.delete(P, target, axis=0)
Q = np.delete(Q, target, axis=1)
# check that start is a transient state (must be different than target)
if start != target:
I = np.eye(Q.shape[0])
mat_Abs = np.linalg.inv(I-Q)
return mat_Abs[start,]
else:
return "start should be different than target"
else:
# check that both start and end states are transient
absorbing_states = np.where( (np.sum(P == 1, axis=1) == 1) & (np.all((P == 0) | (P == 1), axis=1)) )[0]
transient_states = np.setdiff1d(np.arange(P.shape[0]), absorbing_states)
if np.isin(target, transient_states).all() and np.isin(start, transient_states).all():
Q = P[np.ix_(transient_states, transient_states)]
I = np.eye(len(transient_states))
mat_Abs = np.linalg.inv(I-Q)
return mat_Abs[start,target]
else:
return "start and target should be transient"

def absorbtion_probabilities(self, target:int, start=None):
"""
Computes the probability of being absorbed by state target, starting from state start.

The chain must not be ergodic for the calculation to make sense.
"""
if not self.is_ergodic():
P = self.transition_matrix
absorbing_states = np.where( (np.sum(P == 1, axis=1) == 1) & (np.all((P == 0) | (P == 1), axis=1)) )[0]
transient_states = np.setdiff1d(np.arange(P.shape[0]), absorbing_states)
if np.isin(target, absorbing_states).all() and np.isin(start, transient_states).all():
Q = P[np.ix_(transient_states, transient_states)]
I = np.eye(len(transient_states))
R = P[np.ix_(transient_states, absorbing_states)]
mat_Probs = np.matmul(np.linalg.inv(I-Q),R)
return mat_Probs[start,np.where(absorbing_states==target)]
else:
return "the chain shouldn't be ergodic"




6 changes: 6 additions & 0 deletions tests/tests_ctmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,11 @@ def test_absorbtion_times(self):
mc = ctmc(Q, states)
self.assertAlmostEqual(mc.absorbtion_times(target=0,start=1)[0][0],0.1111111)

class TestAbsorbtionProbabilities(unittest.TestCase):
def test_absortion_probabilities(self):
A = ctmc(np.array([[-5, 1, 3,1], [3, -10, 3,4], [0, 0, 0,0],[0,0,0,0]]),
states = np.array([0,1,2,3]))
self.assertAlmostEqual(A.absorbtion_probabilities(target=2,start=0)[0][0], 0.702127659574468)

if __name__ == '__main__':
unittest.main()
21 changes: 21 additions & 0 deletions tests/tests_dtmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,26 @@ def test_dtmc_isergodic_false(self):
mc = dtmc(P)
self.assertFalse(mc.is_ergodic())

class TestFirstPassageTime(unittest.TestCase):
def test_dtmc_first_passage_time(self):
P = np.array([[0.8,0.2,0],[0,0.2,0.8],[0.8,0.2,0]])
mc = dtmc(P)
assert_allclose(mc.first_passage_time(0), [[2.8125], [1.5625]], err_msg="should be [[2.8125], [1.5625]]")

class TestAbsorbtionTimes(unittest.TestCase):
def test_absorbtion_times(self):
P = np.array([[0.2, 0.3, 0.1, 0.4], [0.1, 0.2, 0.5, 0.2], [0,0,1,0], [0,0,0,1]])
states = np.array([0,1,2])
mc = dtmc(P, states)
self.assertAlmostEqual(mc.absorbtion_times(target=0,start=0),1.31147540983607)

class TestAbsorbtionProbabilities(unittest.TestCase):
def test_absortion_probabilities(self):
P = np.array([[0.2, 0.3, 0.1, 0.4], [0.1, 0.2, 0.5, 0.2], [0,0,1,0], [0,0,0,1]])
states = np.array([0,1,2])
mc = dtmc(P, states)
self.assertAlmostEqual(mc.absorbtion_probabilities(target=3,start=0)[0][0], 0.622950819672131)


if __name__ == '__main__':
unittest.main()