-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtrain.py
More file actions
200 lines (170 loc) · 7 KB
/
train.py
File metadata and controls
200 lines (170 loc) · 7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import mlx.core as mx
import numpy as np
import math
import os
# Functions shared by both the training and the testing scripts.
from common import *
# Hyperparameters
HIDDEN_SIZE = 256
LEARNING_RATE = 1e-3
BETA1 = 0.9
BETA2 = 0.999
EPSILON = 1e-8
EPOCHS = 10
BATCH_SIZE = 800
HIDDEN_LAYERS = 4
# Training images. It's a numpy array of size 60000x784.
training = load_images("dataset/train-images-idx3-ubyte/train-images-idx3-ubyte")
labels = load_labels("dataset/train-labels-idx1-ubyte/train-labels-idx1-ubyte")
# The derivative of ReLU. If x > 0, it computes
# the derivative of x, which is 1.0, otherwise
# ReLU returns a constant (0.0) and thus the
# derivative is 0.
def drelu(x):
return (x > 0.0).astype(mx.float32)
# Cross-entropy loss function. Given y (the
# actual value) and y_hat (the predicted value)
# the function returns a scalar telling how
# wrong the prediction is. This function is
# very convenient since the derivative of it
# when using softmax to distribute the predictions
# is just y_hat - y.
def cross_entropy(y, y_hat):
y_hat = mx.clip(y_hat, 1e-9, 1 - 1e-9)
return -mx.mean(mx.sum(y * mx.log(y_hat), axis=1))
# Input layer weights and biases
W1 = mx.random.normal((28*28, HIDDEN_SIZE)) * math.sqrt(2 / (28*28))
b1 = mx.zeros((HIDDEN_SIZE,))
# Output layer weights and biases
W_out = mx.random.normal((HIDDEN_SIZE, 10)) * math.sqrt(2 / HIDDEN_SIZE)
b_out = mx.zeros((10,))
# Hidden layers weights and biases
W_hidden = [mx.random.normal((HIDDEN_SIZE, HIDDEN_SIZE)) * math.sqrt(2 / HIDDEN_SIZE) for _ in range(HIDDEN_LAYERS)]
b_hidden = [mx.zeros((HIDDEN_SIZE,)) for _ in range(HIDDEN_LAYERS)]
# These are the matrices needed for Adam optimization.
# for specifics on the implementation of the algorithm
# refer to https://arxiv.org/abs/1412.6980
# Input weights moment
mW1 = mx.zeros_like(W1)
vW1 = mx.zeros_like(W1)
# Input biases moment
mb1 = mx.zeros_like(b1)
vb1 = mx.zeros_like(b1)
# Hidden layers weights moment
mW_hidden = [mx.zeros_like(W_hidden[0]) for _ in range(HIDDEN_LAYERS)]
vW_hidden = [mx.zeros_like(W_hidden[0]) for _ in range(HIDDEN_LAYERS)]
# Hidden layers biases moment
mb_hidden = [mx.zeros_like(b_hidden[0]) for _ in range(HIDDEN_LAYERS)]
vb_hidden = [mx.zeros_like(b_hidden[0]) for _ in range(HIDDEN_LAYERS)]
# Output weights moment
mW_out = mx.zeros_like(W_out)
vW_out = mx.zeros_like(W_out)
# Output biases moment
mb_out = mx.zeros_like(b_out)
vb_out = mx.zeros_like(b_out)
t = 0
total_steps = EPOCHS * (training.shape[0] / BATCH_SIZE)
steps = 0
# Training pass, repeat the training process
# for an EPOCHS amount of iterations.
for epoch in range(EPOCHS):
for i in range(0, training.shape[0], BATCH_SIZE):
label_batch = mx.zeros((BATCH_SIZE, 10))
batch = mx.array(training[i:i+BATCH_SIZE])
for j, lbl in enumerate(labels[i:i+BATCH_SIZE]):
label_batch[j, int(lbl)] = 1.0
# Forward pass. The forward pass is what does the actual prediction, using
# the current parameters, for a given input batch X
# Input layer regression
z1 = linear(batch, W1, b1)
# Apply activation
a1 = relu(z1)
# Hidden layers regression and activation
z_hidden = [linear(a1, W_hidden[0], b_hidden[0])]
a_hidden = [relu(z_hidden[0])]
for l in range(1, HIDDEN_LAYERS):
z_hidden.append(linear(a_hidden[-1], W_hidden[l], b_hidden[l]))
a_hidden.append(relu(z_hidden[l]))
# Output layer regression
z_out = linear(a_hidden[-1], W_out, b_out)
# Apply softmax
y_hat = softmax(z_out)
# Back propagation. This is where the model "learns"
# by computing how wrong it is to then proceed to
# adjust parameters.
# Calculate the loss using cross-entropy
loss = cross_entropy(label_batch, y_hat)
# Calculate the gradient of the loss function w.r.t.
# the output.
d_loss = (y_hat - label_batch) / batch.shape[0]
# Calculate the gradient of the loss function w.r.t.
# the weights of the output layer.
dW_out = a_hidden[-1].T @ d_loss
# Calculate the gradient of the loss function w.r.t.
# the biases of the output layer.
db_out = mx.sum(d_loss, axis=0)
# Calculate the gradient of the loss function w.r.t.
# the weights and biases of all the hidden layers.
# Gradients of hidden layers
dW_hidden = [None] * HIDDEN_LAYERS
db_hidden = [None] * HIDDEN_LAYERS
# Gradient of previous layer
d_hidden = d_loss
for l in reversed(range(HIDDEN_LAYERS)):
WT = W_out.T if l == HIDDEN_LAYERS - 1 else W_hidden[l + 1].T
d_hidden = (d_hidden @ WT) * drelu(z_hidden[l])
a_prev = a1 if l == 0 else a_hidden[l - 1]
dW_hidden[l] = a_prev.T @ d_hidden
db_hidden[l] = mx.sum(d_hidden, axis=0)
# Backpropagate through the first hidden layer
d1 = (d_hidden @ W_hidden[0].T) * drelu(z_hidden[0])
# Compute gradients for the input layer weights and biases
dW1 = batch.T @ d1
# Calculate the gradient of the loss function w.r.t.
# the biases of the input layer.
db1 = mx.sum(d1, axis=0)
# Adam optimization.
t += 1
# Input layer weights optimization with bias correction.
for theta, gradient, m, v in zip(
[W1, *W_hidden, W_out],
[dW1, *dW_hidden, dW_out],
[mW1, *mW_hidden, mW_out],
[vW1, *vW_hidden, vW_out],
):
m[:] = BETA1 * m + (1 - BETA1) * gradient
v[:] = BETA2 * v + (1 - BETA2) * (gradient ** 2)
m_hat = m / (1 - (BETA1 ** t))
v_hat = v / (1 - (BETA2 ** t))
theta[:] -= LEARNING_RATE * m_hat / (mx.sqrt(v_hat) + EPSILON)
# Input layer biases optimization.
for theta, gradient, m, v in zip(
[b1, *b_hidden, b_out],
[db1, *db_hidden, db_out],
[mb1, *mb_hidden, mb_out],
[vb1, *vb_hidden, vb_out],
):
m[:] = BETA1 * m + (1 - BETA1) * gradient
v[:] = BETA2 * v + (1 - BETA2) * (gradient ** 2)
m_hat = m / (1 - (BETA1 ** t))
v_hat = v / (1 - (BETA2 ** t))
theta[:] -= LEARNING_RATE * m_hat / (mx.sqrt(v_hat) + EPSILON)
steps += 1
if steps % 100 == 0:
mx.eval(m)
mx.eval(v)
if steps % 500 == 0:
print(f"\rtraining: {(100 * (steps / total_steps)):.2f}%, loss: {loss}", end="")
LEARNING_RATE *= 0.99
print("\nTraining complete.")
if not os.path.exists("weights"):
os.makedirs("weights")
if not os.path.exists("biases"):
os.makedirs("biases")
np.savetxt("weights/weights1.txt", np.array(W1))
np.savetxt("biases/biases1.txt", np.array(b1))
for i in range(HIDDEN_LAYERS):
np.savetxt(f"weights/weights_hidden{i}.txt", np.array(W_hidden[i]))
np.savetxt(f"biases/biases_hidden{i}.txt", np.array(b_hidden[i]))
np.savetxt("weights/weights3.txt", np.array(W_out))
np.savetxt("biases/biases3.txt", np.array(b_out))