-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlingam.py
More file actions
140 lines (110 loc) · 3.99 KB
/
lingam.py
File metadata and controls
140 lines (110 loc) · 3.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# -*- coding: utf-8 -*-
"""Estimate LiNGAM model.
"""
# Author: Taku Yoshioka, Shohei Shimizu
# License: MIT
from copy import deepcopy
from munkres import Munkres
import numpy as np
from sklearn.decomposition import FastICA
def _nzdiaghungarian(w):
"""Permurate rows of w to minimize sum(diag(1 / w)).
"""
assert(0 < np.min(np.abs(w)))
w_ = 1 / np.abs(w)
m = Munkres()
ixs = np.vstack(m.compute(deepcopy(w_)))
# Sort by row indices
ixs = ixs[np.argsort(ixs[:, 0]), :]
# Return permutation indices
# r-th row moves to `ixs[r]`.
return ixs[:, 1]
def _slttestperm(b_i):
"""Permute rows and cols of the given matrix.
"""
n = b_i.shape[0]
remnodes = np.arange(n)
b_rem = deepcopy(b_i)
p = list()
for i in range(n):
# Find the row with all zeros
ixs = np.where(np.sum(np.abs(b_rem), axis=1) < 1e-12)[0]
if len(ixs) == 0:
# If empty, return None
return None
else:
# If more than one, rbitrarily select the first
ix = ixs[0]
p.append(remnodes[ix])
# Remove the node (and the row and column from b_rem)
remnodes = np.hstack((remnodes[:ix], remnodes[(ix + 1):]))
ixs = np.hstack((np.arange(ix), np.arange(ix + 1, len(b_rem))))
b_rem = b_rem[ixs, :]
b_rem = b_rem[:, ixs]
return np.array(p)
def _sltprune(b):
"""Finds an permutation for approximate lower triangularization.
"""
n = b.shape[0]
assert(b.shape == (n, n))
# Sort the elements of b
ixs = np.argsort(np.abs(b).ravel())
for i in range(int(n * (n + 1) / 2) - 1, (n * n)):
b_i = deepcopy(b)
# NOTE: `ravel()` returns a view of the given array
b_i.ravel()[ixs[:i]] = 0
ixs_perm = _slttestperm(b_i)
if ixs_perm is not None:
b_opt = deepcopy(b)
b_opt = b_opt[ixs_perm, :]
b_opt = b_opt[:, ixs_perm]
return b_opt, ixs_perm
raise ValueError("Failed to do lower triangularization.")
def estimate(xs, random_state=1234):
"""Estimate LiNGAM model.
Parameters
----------
xs : numpy.ndarray, shape=(n_samples, n_features)
Data matrix.
seed : int
The seed of random number generator used in the function.
Returns
-------
b_est : numpy.ndarray, shape=(n_features, n_features)
Estimated coefficient matrix with LiNGAM. This can be transformed to
a strictly lower triangular matrix by permuting rows and columns,
implying that the directed graph represented by b_est is acyclic.
NOTE: Each row of `b` corresponds to each variable, i.e., X = BX.
"""
n_samples, n_features = xs.shape
ica = FastICA(random_state=random_state, max_iter=1000).fit(xs)
#print("ica.mixing_ = \n", ica.mixing_)
w = np.linalg.pinv(ica.mixing_)
#print("w = \n", w)
#print("w.ica = \n", np.dot(ica.mixing_, w))
#print("w.ica.w = \n", np.dot(np.dot(ica.mixing_, w), ica.mixing_))
assert(w.shape == (n_features, n_features))
# TODO: check statistical independence of icasig
# icasig = ica.components_
# Permute rows of w so that np.sum(1/np.diag(w_perm)) is minimized
# Permutation order does not make sense in the following processing,
# because the permutation is canceled with independent components, whose
# order is arbitrary.
ixs_perm = _nzdiaghungarian(w)
w_perm = np.zeros_like(w)
w_perm[ixs_perm] = w
# Divide each row of wp by the diagonal element
w_perm = w_perm / np.diag(w_perm)[:, np.newaxis]
#print(w_perm)
# Estimate b
b_est = np.eye(n_features) - w_perm
#print(b_est)
# Permute the rows and columns of b_est
b_csl, p_csl = _sltprune(b_est)
# Set the upper triangular to zero
b_csl = np.tril(b_csl, -1)
# Permute b_csl back to the original variable
b_est = b_csl # just rename here
b_est[p_csl, :] = deepcopy(b_est)
b_est[:, p_csl] = deepcopy(b_est)
return b_est