forked from pyoadfe/seminars
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdecision_tree.py
More file actions
108 lines (86 loc) · 3.78 KB
/
decision_tree.py
File metadata and controls
108 lines (86 loc) · 3.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from collections import namedtuple
import numpy as np
from scipy import optimize
Leaf = namedtuple('Leaf', ('value', 'x', 'y'))
Node = namedtuple('Node', ('feature', 'value', 'impurity', 'left', 'right',))
class BaseDecisionTree:
def __init__(self, x, y, max_depth=np.inf):
self.x = np.atleast_2d(x)
self.y = np.atleast_1d(y)
self.max_depth = max_depth
self.features = x.shape[1]
self.root = self.build_tree(self.x, self.y)
# Will fail in case of depth ~ 1000 because of limit of recursion calls
def build_tree(self, x, y, depth=1):
if depth > self.max_depth or self.criteria(y) < 1e-6:
return Leaf(self.leaf_value(y), x, y)
feature, value, impurity = self.find_best_split(x, y)
left_xy, right_xy = self.partition(x, y, feature, value)
left = self.build_tree(*left_xy, depth=depth + 1)
right = self.build_tree(*right_xy, depth=depth + 1)
return Node(feature, value, impurity, left, right)
def leaf_value(self, y):
raise NotImplementedError
def partition(self, x, y, feature, value):
i = x[:, feature] >= value
j = np.logical_not(i)
return (x[j], y[j]), (x[i], y[i])
def _impurity_partition(self, value, feature, x, y):
(_, left), (_, right) = self.partition(x, y, feature, value)
return self.impurity(left, right)
def find_best_split(self, x, y):
best_feature, best_value, best_impurity = 0, x[0,0], np.inf
for feature in range(self.features):
if x.shape[0] > 2:
x_interval = np.sort(x[:,feature])
res = optimize.minimize_scalar(
self._impurity_partition,
args=(feature, x, y),
bounds=(x_interval[1], x_interval[-1]),
method='Bounded',
)
assert res.success
value = res.x
impurity = res.fun
else:
value = np.max(x[:,feature])
impurity = self._impurity_partition(value, feature, x, y)
if impurity < best_impurity:
best_feature, best_value, best_impurity = feature, value, impurity
return best_feature, best_value, best_impurity
# Can be optimized for given .criteria()
def impurity(self, left, right):
h_l = self.criteria(left)
h_r = self.criteria(right)
return (left.size * h_l + right.size * h_r) / (left.size + right.size)
def criteria(self, y):
raise NotImplementedError
def predict(self, x):
x = np.atleast_2d(x)
y = np.empty(x.shape[0], dtype=self.y.dtype)
for i, row in enumerate(x):
node = self.root
while not isinstance(node, Leaf):
if row[node.feature] >= node.value:
node = node.right
else:
node = node.left
y[i] = node.value
return y
class DecisionTreeClassifier(BaseDecisionTree):
def __init__(self, x, y, *args, random_state=None, **kwargs):
y = np.asarray(y, dtype=int)
self.random_state = np.random.RandomState(random_state)
self.classes = np.unique(y)
super().__init__(x, y, *args, **kwargs)
def leaf_value(self, y):
class_counts = np.sum(y == self.classes.reshape(-1,1), axis=1)
m = np.max(class_counts)
most_common = self.classes[class_counts == m]
if most_common.size == 1:
return most_common[0]
return self.random_state.choice(most_common)
def criteria(self, y):
"""Gini"""
p = np.sum(y == self.classes.reshape(-1,1), axis=1) / y.size
return np.sum(p * (1 - p))