Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 190 additions & 0 deletions layer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# -*- encoding: utf-8 -*-
'''
@File : layer.py
@Time : 2021/11/30 14:43:25
@Author : sheep
@Version : 1.0
@Contact : 1173886760@qq.com
@Desc : None
'''

from ..core import *
from ..ops import *
import numpy as np

def Dense(input, feature_in, feature_out, **kargs):
"""[Fully Connected Layer]

Args:
input ([ndarray]): [input neurons]
feature_in ([int]): [feature in]
feature_out ([int]): [feature out]
"""
name = kargs.get('name', "")
mean = kargs.get('mean', 0.0)
std = kargs.get('std', 0.001)
weight = Variable((feature_in, feature_out), init=True, trainable=True, std=std, mean=mean, prefix=name)
bias = Variable((1, feature_out), init = True, trainable = True, bias = True, prefix = name)
return AddOperator(MatMulOperator(input, weight, prefix=name), bias, prefix=name)

def Conv(input, channel_in, channel_out, kernel_size, stride, padding, bias=True, **kargs):
"""[summary]
input should be formatted as [N, C, H, W]

Args:
input ([type]): [description]
feature_in ([type]): [description]
feature_out ([type]): [description]
kernel_size ([type]): [description]
stride ([type]): [description]
padding ([type]): [description]
"""

name = kargs.get('name', "")
mean = kargs.get('mean', 0.0)
std = kargs.get('std', 0.001)
# [Cin, k, k, Cout]
weight = Variable((channel_in, kernel_size, kernel_size, channel_out), init=True, trainable=True, std=std, mean=mean, prefix=name)

if bias:
# because input is [N, C, H, W], so we boardcast in (0, 2, 3) dims
bias = Variable((1, channel_out, 1, 1), init=True, trainable=True, bias=True, prefix=name)
return AddOperator(ConvOperator(input, weight, kernel_size=kernel_size, channel_in=channel_in, channel_out=channel_out,
stride=stride, padding=padding, prefix=name), bias, prefix=name)
else:
return ConvOperator(input, weight, kernel_size=kernel_size, channel_in=channel_in, channel_out=channel_out,
stride=stride, padding=padding, prefix=name)

def MaxPooling(input, kernel_size, stride, **kargs):
"""[summary]
input should be formatted as [N, C, H, W]
Args:
input ([type]): [description]
kernel_size ([type]): [description]
stride ([type]): [description]
"""

name = kargs.get('name', "")
return MaxPoolingOperator(input, kernel_size=kernel_size, stride=stride, prefix=name)

def AvgPooling(input, kernel_size, stride, **kargs):
"""[summary]
input should be formatted as [N, C, H, W]
Args:
input ([type]): [description]
kernel_size ([type]): [description]
stride ([type]): [description]
"""

name = kargs.get('name', "")
return AvgPoolingOperator(input, kernel_size=kernel_size, stride=stride, prefix=name)

def Flatten(input, **kargs):
"""[summary]
use reshape operator to flatten the input

Args:
input ([type]): [description]
"""
name = kargs.get('name', "")
batch_size = input.dims[0]
feature_size = np.product(input.dims[1: ])
return ReshapeOperator(input, to_shape=(batch_size, feature_size), prefix=name)

def BatchNorm(input, **kargs):
"""[batch normalization layer]

Args:
input ([type]): [description]
"""

name = kargs.get('name', "")
gamma = Variable(tuple([1] + list(input.dims[1: ])), init=False, trainable=True, prefix=name)
# initialize to all one matrix
gamma.set_value(np.ones(gamma.dims))

bias = Variable(tuple([1] + list(input.dims[1: ])), init=True, trainable=True, bias=True, prefix=name)
return BatchNormOperator(input, gamma, bias, prefix=name)

def ReLU(input, **kargs):
"""[ReLU layer]

Args:
input ([type]): [description]
"""

name = kargs.get('name' "")
return ReLUOperator(input, prefix=name)

def DropOut(input, drop_prob, **kargs):
"""[DropOut layer]

Args:
input ([type]): [description]
drop_prob ([type]): [description]
"""

name = kargs.get('name', "")
return DropOutOperator(input, drop_prob=drop_prob, prefix=name)

def append_namescope(name, scope):
if name == "":
return ""
else:
return '{}/{}'.format(name, scope)

# referring to https://github.com/weiaicunzai/pytorch-cifar100/blob/master/models/resnet.py
def BasicBlock(input, in_channels, out_channels, stride=1, **kargs):
"""[Basic Residual Block]

Args:
input ([type]): [description]
in_channels ([type]): [description]
out_channels ([type]): [description]
stride (int, optional): [description]. Defaults to 1.
"""
name = kargs.get('name', "")

conv1 = Conv(input, in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False,
name=append_namescope(name, 'conv1'))
bn1 = BatchNorm(conv1, name=append_namescope(name, 'bn1'))
relu1 = ReLU(bn1, name=append_namescope(name, 'relu1'))
conv2 = Conv(relu1, out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False,
name=append_namescope(name, 'conv2'))
bn2 = BatchNorm(conv2, name=append_namescope(name, 'bn2'))
residual_function = bn2

shortcut = input
if stride != 1 or in_channels != out_channels:
conv3 = Conv(input, in_channels, out_channels, kernel_size=1, stride=stride, padding=0, bias=False,
name=append_namescope(name, 'conv3'))
bn3 = BatchNorm(conv3, name=append_namescope(name, 'bn3'))
shortcut = bn3

return ReLU(AddOperator(residual_function, shortcut, prefix=name), name=append_namescope(name, 'relu3'))

def RNN(inputs, input_size, hidden_size=10, **kargs):
"""[Recurrent Neural Network]

Args:
input ([numpy.ndarray]): (batch_size, input_size)
hidden_size
"""
batch_size = len(inputs)
name = kargs.get('name', "")
mean = kargs.get('mean', 0.0)
std = kargs.get('std', 0.001)

U = Variable(dims = (input_size, hidden_size), init=True, trainable=True, std=std, mean=mean, prefix=name)
W = Variable(dims = (hidden_size, hidden_size), init=True, trainable=True, std=std, mean=mean, prefix=name)
b = Variable(dims = (1, hidden_size), init=True, trainable=True, bias=True, prefix=name)

last_step = None
for iv in inputs:
h = AddOperator(MatMulOperator(iv, U), b)
if last_step is not None:
h = AddOperator(MatMulOperator(last_step, W), h)
h = ReLUOperator(h)
last_step = h

return last_step
61 changes: 61 additions & 0 deletions loss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# -*- encoding: utf-8 -*-
'''
@File : loss.py
@Time : 2021/11/29 21:33:53
@Author : sheep
@Version : 1.0
@Contact : 1173886760@qq.com
@Desc : Loss function
'''

import numpy as np
from ..core import Node

# abc for loss function
class LossFunction(Node):
pass

class L2Loss(LossFunction):
# assume the param matrixes are [batch, features]
def __init__(self, *parents, **kargs) -> None:
LossFunction.__init__(self, *parents, **kargs)
self.batch_size = parents[0].dims[0]

def compute(self):
self.value = np.sum(np.square(self.parents[0].value - self.parents[1].value)) / self.batch_size

def get_graident(self, parent):

if parent is self.parents[0]:
return 2 * np.subtract(self.parents[0].value, self.parents[1].value) / self.batch_size
else:
return 2 * np.subtract(self.parents[1].value, self.parents[0].value) / self.batch_size

class CrossEntropyWithSoftMax(LossFunction):
# assume the param matrixes are [batch, features]
# parent[0] is input, parent[1] is label
"""[loss]
first input is prob, second input is label,
input dims are [batch_size, features],
label dims are [batch_size]

Args:
LossFunction ([type]): [description]
"""
def __init__(self, *parents, **kargs) -> None:
LossFunction.__init__(self, *parents, **kargs)
self.batch_size = parents[0].dims[0]
self.eps = 1e-9

def compute(self):
# print(self.parents[0].value)
input_max = np.max(self.parents[0].value, axis=1)
input_exp = np.exp(np.subtract(self.parents[0].value, input_max))
self.prob = input_exp / np.sum(input_exp, axis=1)

self.label_onehot = np.zeros_like(self.prob)
self.label_onehot[np.arange(self.batch_size), self.parents[1].value.astype(int).reshape(-1)] = 1.0
self.value = -np.sum(np.multiply(np.log(np.add(self.prob, self.eps)), self.label_onehot)) / self.batch_size

def get_graident(self, parent):
return (self.prob - self.label_onehot) / self.batch_size
81 changes: 81 additions & 0 deletions rnn_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import pytoy as pt
import numpy as np

from pytoy.layer.layer import Dense
from pytoy.core.node import Variable
from pytoy.ops.ops import SoftMax
from pytoy.ops.loss import CrossEntropyWithSoftMax
from pytoy.ops import *
from pytoy.layer.layer import RNN

max_len = 100
input_size = 16
hidden_size = 12
batch_size = 10

# get_sequence_data just for generate train set
from scipy import signal

def get_sequence_data(dimension=10, length=10, number_of_example=1000, train_set_ratio=0.7, seed=42):
xx = []
xx.append(np.sin(np.arange(0, 10, 10 / length)).reshape(-1, 1))
xx.append(np.array(signal.square(np.arange(0, 10, 10 / length))).reshape(-1, 1))

data = []
for i in range(2):
x = xx[i]
for j in range(number_of_example // 2):
sequence = x + np.random.normal(0, 0.6, (len(x), dimension))
label = np.array([int(i == 0)])
data.append(np.c_[sequence.reshape(1, -1), label.reshape(1, -1)])

data = np.concatenate(data, axis=0)

np.random.shuffle(data)

train_set_size = int(number_of_example * train_set_ratio)

return (data[:train_set_size, :-1].reshape(-1, length, dimension),
data[:train_set_size, -1:],
data[train_set_size:, :-1].reshape(-1, length, dimension),
data[train_set_size:, -1:])

signal_train, label_train, signal_test, label_test = get_sequence_data(length=max_len, dimension=input_size)

inputs = [Variable(dims=(batch_size, input_size), init=False, trainable=False) for i in range(max_len)]
last_step = RNN(inputs, input_size, hidden_size)
output = Dense(last_step, hidden_size, 2)
predict = SoftMax(output)

label = Variable(dims=(batch_size, 1), trainable=False)
loss = CrossEntropyWithSoftMax(output, label)

learning_rate = 0.005
adam = pt.optimizer.Adam(pt.default_graph, loss, learning_rate)

for epoch in range(30):
for i in range(0, len(signal_train), batch_size):
# signal_train : (sample_number, max_len, input_size)
# inputs : (max_len, (1, input_size))
for j, iv in enumerate(inputs):
iv.set_value(np.mat(signal_train[i:i + batch_size, j]))

label.set_value(np.mat(label_train[i:i + batch_size]))
adam.step()
adam.update()

print("epoch {:d} is over".format(epoch + 1))

pred = []
for i in range(0, len(signal_test), batch_size):
for j, iv in enumerate(inputs):
iv.set_value(np.mat(signal_test[i:i + batch_size, j]))

predict.forward()
pred.append(predict.value)

pred = np.array(pred).argmax(axis=2)
label_test = label_test.reshape(-1, batch_size)

accuracy = (label_test == pred).sum() / len(signal_test)
print("epoch: {:d}, accuracy: {:.5f}".format(epoch+1, accuracy))