diff --git a/layer.py b/layer.py new file mode 100644 index 0000000..9ac7cab --- /dev/null +++ b/layer.py @@ -0,0 +1,190 @@ +# -*- encoding: utf-8 -*- +''' +@File : layer.py +@Time : 2021/11/30 14:43:25 +@Author : sheep +@Version : 1.0 +@Contact : 1173886760@qq.com +@Desc : None +''' + +from ..core import * +from ..ops import * +import numpy as np + +def Dense(input, feature_in, feature_out, **kargs): + """[Fully Connected Layer] + + Args: + input ([ndarray]): [input neurons] + feature_in ([int]): [feature in] + feature_out ([int]): [feature out] + """ + name = kargs.get('name', "") + mean = kargs.get('mean', 0.0) + std = kargs.get('std', 0.001) + weight = Variable((feature_in, feature_out), init=True, trainable=True, std=std, mean=mean, prefix=name) + bias = Variable((1, feature_out), init = True, trainable = True, bias = True, prefix = name) + return AddOperator(MatMulOperator(input, weight, prefix=name), bias, prefix=name) + +def Conv(input, channel_in, channel_out, kernel_size, stride, padding, bias=True, **kargs): + """[summary] + input should be formatted as [N, C, H, W] + + Args: + input ([type]): [description] + feature_in ([type]): [description] + feature_out ([type]): [description] + kernel_size ([type]): [description] + stride ([type]): [description] + padding ([type]): [description] + """ + + name = kargs.get('name', "") + mean = kargs.get('mean', 0.0) + std = kargs.get('std', 0.001) + # [Cin, k, k, Cout] + weight = Variable((channel_in, kernel_size, kernel_size, channel_out), init=True, trainable=True, std=std, mean=mean, prefix=name) + + if bias: + # because input is [N, C, H, W], so we boardcast in (0, 2, 3) dims + bias = Variable((1, channel_out, 1, 1), init=True, trainable=True, bias=True, prefix=name) + return AddOperator(ConvOperator(input, weight, kernel_size=kernel_size, channel_in=channel_in, channel_out=channel_out, + stride=stride, padding=padding, prefix=name), bias, prefix=name) + else: + return ConvOperator(input, weight, kernel_size=kernel_size, channel_in=channel_in, channel_out=channel_out, + stride=stride, padding=padding, prefix=name) + +def MaxPooling(input, kernel_size, stride, **kargs): + """[summary] + input should be formatted as [N, C, H, W] + Args: + input ([type]): [description] + kernel_size ([type]): [description] + stride ([type]): [description] + """ + + name = kargs.get('name', "") + return MaxPoolingOperator(input, kernel_size=kernel_size, stride=stride, prefix=name) + +def AvgPooling(input, kernel_size, stride, **kargs): + """[summary] + input should be formatted as [N, C, H, W] + Args: + input ([type]): [description] + kernel_size ([type]): [description] + stride ([type]): [description] + """ + + name = kargs.get('name', "") + return AvgPoolingOperator(input, kernel_size=kernel_size, stride=stride, prefix=name) + +def Flatten(input, **kargs): + """[summary] + use reshape operator to flatten the input + + Args: + input ([type]): [description] + """ + name = kargs.get('name', "") + batch_size = input.dims[0] + feature_size = np.product(input.dims[1: ]) + return ReshapeOperator(input, to_shape=(batch_size, feature_size), prefix=name) + +def BatchNorm(input, **kargs): + """[batch normalization layer] + + Args: + input ([type]): [description] + """ + + name = kargs.get('name', "") + gamma = Variable(tuple([1] + list(input.dims[1: ])), init=False, trainable=True, prefix=name) + # initialize to all one matrix + gamma.set_value(np.ones(gamma.dims)) + + bias = Variable(tuple([1] + list(input.dims[1: ])), init=True, trainable=True, bias=True, prefix=name) + return BatchNormOperator(input, gamma, bias, prefix=name) + +def ReLU(input, **kargs): + """[ReLU layer] + + Args: + input ([type]): [description] + """ + + name = kargs.get('name' "") + return ReLUOperator(input, prefix=name) + +def DropOut(input, drop_prob, **kargs): + """[DropOut layer] + + Args: + input ([type]): [description] + drop_prob ([type]): [description] + """ + + name = kargs.get('name', "") + return DropOutOperator(input, drop_prob=drop_prob, prefix=name) + +def append_namescope(name, scope): + if name == "": + return "" + else: + return '{}/{}'.format(name, scope) + +# referring to https://github.com/weiaicunzai/pytorch-cifar100/blob/master/models/resnet.py +def BasicBlock(input, in_channels, out_channels, stride=1, **kargs): + """[Basic Residual Block] + + Args: + input ([type]): [description] + in_channels ([type]): [description] + out_channels ([type]): [description] + stride (int, optional): [description]. Defaults to 1. + """ + name = kargs.get('name', "") + + conv1 = Conv(input, in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False, + name=append_namescope(name, 'conv1')) + bn1 = BatchNorm(conv1, name=append_namescope(name, 'bn1')) + relu1 = ReLU(bn1, name=append_namescope(name, 'relu1')) + conv2 = Conv(relu1, out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False, + name=append_namescope(name, 'conv2')) + bn2 = BatchNorm(conv2, name=append_namescope(name, 'bn2')) + residual_function = bn2 + + shortcut = input + if stride != 1 or in_channels != out_channels: + conv3 = Conv(input, in_channels, out_channels, kernel_size=1, stride=stride, padding=0, bias=False, + name=append_namescope(name, 'conv3')) + bn3 = BatchNorm(conv3, name=append_namescope(name, 'bn3')) + shortcut = bn3 + + return ReLU(AddOperator(residual_function, shortcut, prefix=name), name=append_namescope(name, 'relu3')) + +def RNN(inputs, input_size, hidden_size=10, **kargs): + """[Recurrent Neural Network] + + Args: + input ([numpy.ndarray]): (batch_size, input_size) + hidden_size + """ + batch_size = len(inputs) + name = kargs.get('name', "") + mean = kargs.get('mean', 0.0) + std = kargs.get('std', 0.001) + + U = Variable(dims = (input_size, hidden_size), init=True, trainable=True, std=std, mean=mean, prefix=name) + W = Variable(dims = (hidden_size, hidden_size), init=True, trainable=True, std=std, mean=mean, prefix=name) + b = Variable(dims = (1, hidden_size), init=True, trainable=True, bias=True, prefix=name) + + last_step = None + for iv in inputs: + h = AddOperator(MatMulOperator(iv, U), b) + if last_step is not None: + h = AddOperator(MatMulOperator(last_step, W), h) + h = ReLUOperator(h) + last_step = h + + return last_step diff --git a/loss.py b/loss.py new file mode 100644 index 0000000..1f3f010 --- /dev/null +++ b/loss.py @@ -0,0 +1,61 @@ +# -*- encoding: utf-8 -*- +''' +@File : loss.py +@Time : 2021/11/29 21:33:53 +@Author : sheep +@Version : 1.0 +@Contact : 1173886760@qq.com +@Desc : Loss function +''' + +import numpy as np +from ..core import Node + +# abc for loss function +class LossFunction(Node): + pass + +class L2Loss(LossFunction): + # assume the param matrixes are [batch, features] + def __init__(self, *parents, **kargs) -> None: + LossFunction.__init__(self, *parents, **kargs) + self.batch_size = parents[0].dims[0] + + def compute(self): + self.value = np.sum(np.square(self.parents[0].value - self.parents[1].value)) / self.batch_size + + def get_graident(self, parent): + + if parent is self.parents[0]: + return 2 * np.subtract(self.parents[0].value, self.parents[1].value) / self.batch_size + else: + return 2 * np.subtract(self.parents[1].value, self.parents[0].value) / self.batch_size + +class CrossEntropyWithSoftMax(LossFunction): + # assume the param matrixes are [batch, features] + # parent[0] is input, parent[1] is label + """[loss] + first input is prob, second input is label, + input dims are [batch_size, features], + label dims are [batch_size] + + Args: + LossFunction ([type]): [description] + """ + def __init__(self, *parents, **kargs) -> None: + LossFunction.__init__(self, *parents, **kargs) + self.batch_size = parents[0].dims[0] + self.eps = 1e-9 + + def compute(self): + # print(self.parents[0].value) + input_max = np.max(self.parents[0].value, axis=1) + input_exp = np.exp(np.subtract(self.parents[0].value, input_max)) + self.prob = input_exp / np.sum(input_exp, axis=1) + + self.label_onehot = np.zeros_like(self.prob) + self.label_onehot[np.arange(self.batch_size), self.parents[1].value.astype(int).reshape(-1)] = 1.0 + self.value = -np.sum(np.multiply(np.log(np.add(self.prob, self.eps)), self.label_onehot)) / self.batch_size + + def get_graident(self, parent): + return (self.prob - self.label_onehot) / self.batch_size \ No newline at end of file diff --git a/rnn_test.py b/rnn_test.py new file mode 100644 index 0000000..ffbbce1 --- /dev/null +++ b/rnn_test.py @@ -0,0 +1,81 @@ +import pytoy as pt +import numpy as np + +from pytoy.layer.layer import Dense +from pytoy.core.node import Variable +from pytoy.ops.ops import SoftMax +from pytoy.ops.loss import CrossEntropyWithSoftMax +from pytoy.ops import * +from pytoy.layer.layer import RNN + +max_len = 100 +input_size = 16 +hidden_size = 12 +batch_size = 10 + +# get_sequence_data just for generate train set +from scipy import signal + +def get_sequence_data(dimension=10, length=10, number_of_example=1000, train_set_ratio=0.7, seed=42): + xx = [] + xx.append(np.sin(np.arange(0, 10, 10 / length)).reshape(-1, 1)) + xx.append(np.array(signal.square(np.arange(0, 10, 10 / length))).reshape(-1, 1)) + + data = [] + for i in range(2): + x = xx[i] + for j in range(number_of_example // 2): + sequence = x + np.random.normal(0, 0.6, (len(x), dimension)) + label = np.array([int(i == 0)]) + data.append(np.c_[sequence.reshape(1, -1), label.reshape(1, -1)]) + + data = np.concatenate(data, axis=0) + + np.random.shuffle(data) + + train_set_size = int(number_of_example * train_set_ratio) + + return (data[:train_set_size, :-1].reshape(-1, length, dimension), + data[:train_set_size, -1:], + data[train_set_size:, :-1].reshape(-1, length, dimension), + data[train_set_size:, -1:]) + +signal_train, label_train, signal_test, label_test = get_sequence_data(length=max_len, dimension=input_size) + +inputs = [Variable(dims=(batch_size, input_size), init=False, trainable=False) for i in range(max_len)] +last_step = RNN(inputs, input_size, hidden_size) +output = Dense(last_step, hidden_size, 2) +predict = SoftMax(output) + +label = Variable(dims=(batch_size, 1), trainable=False) +loss = CrossEntropyWithSoftMax(output, label) + +learning_rate = 0.005 +adam = pt.optimizer.Adam(pt.default_graph, loss, learning_rate) + +for epoch in range(30): + for i in range(0, len(signal_train), batch_size): + # signal_train : (sample_number, max_len, input_size) + # inputs : (max_len, (1, input_size)) + for j, iv in enumerate(inputs): + iv.set_value(np.mat(signal_train[i:i + batch_size, j])) + + label.set_value(np.mat(label_train[i:i + batch_size])) + adam.step() + adam.update() + + print("epoch {:d} is over".format(epoch + 1)) + + pred = [] + for i in range(0, len(signal_test), batch_size): + for j, iv in enumerate(inputs): + iv.set_value(np.mat(signal_test[i:i + batch_size, j])) + + predict.forward() + pred.append(predict.value) + + pred = np.array(pred).argmax(axis=2) + label_test = label_test.reshape(-1, batch_size) + + accuracy = (label_test == pred).sum() / len(signal_test) + print("epoch: {:d}, accuracy: {:.5f}".format(epoch+1, accuracy)) \ No newline at end of file