git 项目大瘦身

This commit is contained in:
jiangzhonglian
2019-10-11 16:48:36 +08:00
commit 81abcb3f3a
169 changed files with 35021 additions and 0 deletions

View File

@@ -0,0 +1,38 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import numpy as np
class ReluActivator(object):
def forward(self, weighted_input):
#return weighted_input
return max(0, weighted_input)
def backward(self, output):
return 1 if output > 0 else 0
class IdentityActivator(object):
def forward(self, weighted_input):
return weighted_input
def backward(self, output):
return 1
class SigmoidActivator(object):
def forward(self, weighted_input):
return 1.0 / (1.0 + np.exp(-weighted_input))
def backward(self, output):
return output * (1 - output)
class TanhActivator(object):
def forward(self, weighted_input):
return 2.0 / (1.0 + np.exp(-2 * weighted_input)) - 1.0
def backward(self, output):
return 1 - output * output

863
src/py2.x/dl/bp.py Normal file
View File

@@ -0,0 +1,863 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import random
from numpy import *
# sigmoid 函数
def sigmoid(inX):
'''
Desc:
sigmoid 函数实现
Args:
inX --- 输入向量
Returns:
对输入向量作用 sigmoid 函数之后得到的输出
'''
return 1.0 / (1 + exp(-inX))
# 定义神经网络的节点类
class Node(object):
'''
Desc:
神经网络的节点类
'''
def __init__(self, layer_index, node_index):
'''
Desc:
初始化一个节点
Args:
layer_index --- 层的索引,也就是表示第几层
node_index --- 节点的索引,也就是表示节点的索引
Returns:
None
'''
# 设置节点所在的层的位置
self.layer_index = layer_index
# 设置层中的节点的索引
self.node_index = node_index
# 设置此节点的下游节点,也就是这个节点与下一层的哪个节点相连
self.downstream = []
# 设置此节点的上游节点,也就是哪几个节点的下游节点与此节点相连
self.upstream = []
# 此节点的输出
self.output = 0
# 此节点真实值与计算值之间的差值
self.delta = 0
def set_output(self, output):
'''
Desc:
设置节点的 output
Args:
output --- 节点的 output
Returns:
None
'''
self.output = output
def append_downstream_connection(self, conn):
'''
Desc:
添加此节点的下游节点的连接
Args:
conn --- 当前节点的下游节点的连接的 list
Returns:
None
'''
# 使用 list 的 append 方法来将 conn 中的节点添加到 downstream 中
self.downstream.append(conn)
def append_upstream_connection(self, conn):
'''
Desc:
添加此节点的上游节点的连接
Args:
conn ---- 当前节点的上游节点的连接的 list
Returns:
None
'''
# 使用 list 的 append 方法来将 conn 中的节点添加到 upstream 中
self.upstream.append(conn)
def calc_output(self):
'''
Desc:
计算节点的输出,依据 output = sigmoid(wTx)
Args:
None
Returns:
None
'''
# 使用 reduce() 函数对其中的因素求和
output = reduce(lambda ret, conn: ret + conn.upstream_node.output * conn.weight, self.upstream, 0)
# 对上游节点的 output 乘 weights 之后求和得到的结果应用 sigmoid 函数,得到当前节点的 output
self.output = sigmoid(output)
def calc_hidden_layer_delta(self):
'''
Desc:
计算隐藏层的节点的 delta
Args:
output --- 节点的 output
Returns:
None
'''
# 根据 https://www.zybuluo.com/hanbingtao/note/476663 的 式4 计算隐藏层的delta
downstream_delta = reduce(lambda ret, conn: ret + conn.downstream_node.delta * conn.weight, self.downstream, 0.0)
# 计算此节点的 delta
self.delta = self.output * (1 - self.output) * downstream_delta
def calc_output_layer_delta(self, label):
'''
Desc:
计算输出层的 delta
Args:
label --- 输入向量对应的真实标签,不是计算得到的结果
Returns:
None
'''
# 就是那输出层的 delta
self.delta = self.output * (1 - self.output) * (label - self.output)
def __str__(self):
'''
Desc:
将节点的信息打印出来
Args:
None
Returns:
None
'''
# 打印格式:第几层 - 第几个节点output 是多少delta 是多少
node_str = '%u-%u: output: %f delta: %f' % (self.layer_index, self.node_index, self.output, self.delta)
# 下游节点
downstream_str = reduce(lambda ret, conn: ret + '\n\t' + str(conn), self.downstream, '')
# 上游节点
upstream_str = reduce(lambda ret, conn: ret + '\n\t' + str(conn), self.upstream, '')
# 将本节点 + 下游节点 + 上游节点 的信息打印出来
return node_str + '\n\tdownstream:' + downstream_str + '\n\tupstream:' + upstream_str
# ConstNode 对象,为了实现一个输出恒为 1 的节点(计算偏置项 wb 时需要)
class ConstNode(object):
'''
Desc:
常数项对象,即相当于计算的时候的偏置项
'''
def __init__(self, layer_index, node_index):
'''
Desc:
初始化节点对象
Args:
layer_index --- 节点所属的层的编号
node_index --- 节点的编号
Returns:
None
'''
self.layer_index = layer_index
self.node_index = node_index
self.downstream = []
self.output = 1
def append_downstream_connection(self, conn):
'''
Desc:
添加一个到下游节点的连接
Args:
conn --- 到下游节点的连接
Returns:
None
'''
# 使用 list 的 append 方法将包含下游节点的 conn 添加到 downstream 中
self.downstream.append(conn)
def calc_hidden_layer_delta(self):
'''
Desc:
计算隐藏层的 delta
Args:
None
Returns:
None
'''
# 使用我们的 公式 4 来计算下游节点的 delta求和
downstream_delta = reduce(lambda ret, conn: ret + conn.downstream_node.delta * conn.weight, self.downstream, 0.0)
# 计算隐藏层的本节点的 delta
self.delta = self.output * (1 - self.output) * downstream_delta
def __str__(self):
'''
Desc:
将节点信息打印出来
Args:
None
Returns:
None
'''
# 将节点的信息打印出来
# 格式 第几层-第几个节点的 output
node_str = '%u-%u: output: 1' % (self.layer_index, self.node_index)
# 此节点的下游节点的信息
downstream_str = reduce(lambda ret, conn: ret + '\n\t' + str(conn), self.downstream, '')
# 将此节点与下游节点的信息组合,一起打印出来
return node_str + '\n\tdownstream:' + downstream_str
# 神经网络的层对象,负责初始化一层。此外,作为 Node 的集合对象,提供对 Node 集合的操作
class Layer(object):
'''
Desc:
神经网络的 Layer 类
'''
def __init__(self, layer_index, node_count):
'''
Desc:
神经网络的层对象的初始化
Args:
layer_index --- 层的索引
node_count --- 节点的个数
Returns:
None
'''
# 设置 层的索引
self.layer_index = layer_index
# 设置层中的节点的 list
self.nodes = []
# 将 Node 节点添加到 nodes 中
for i in range(node_count):
self.nodes.append(Node(layer_index, i))
# 将 ConstNode 节点也添加到 nodes 中
self.nodes.append(ConstNode(layer_index, node_count))
def set_output(self, data):
'''
Desc:
设置层的输出,当层是输入层时会用到
Args:
data --- 输出的值的 list
Returns:
None
'''
# 设置输入层中各个节点的 output
for i in range(len(data)):
self.nodes[i].set_output(data[i])
def calc_output(self):
'''
Desc:
计算层的输出向量
Args:
None
Returns:
None
'''
# 遍历本层的所有节点除去最后一个节点因为它是恒为常数的偏置项b
# 调用节点的 calc_output 方法来计算输出向量
for node in self.nodes[:-1]:
node.calc_output()
def dump(self):
'''
Desc:
将层信息打印出来
Args:
None
Returns:
None
'''
# 遍历层的所有的节点 nodes将节点信息打印出来
for node in self.nodes:
print(node)
# Connection 对象类,主要负责记录连接的权重,以及这个连接所关联的上下游的节点
class Connection(object):
'''
Desc:
Connection 对象,记录连接权重和连接所关联的上下游节点,注意,这里的 connection 没有 s ,不是复数
'''
def __init__(self, upstream_node, downstream_node):
'''
Desc:
初始化 Connection 对象
Args:
upstream_node --- 上游节点
downstream_node --- 下游节点
Returns:
None
'''
# 设置上游节点
self.upstream_node = upstream_node
# 设置下游节点
self.downstream_node = downstream_node
# 设置权重,这里设置的权重是 -0.1 到 0.1 之间的任何数
self.weight = random.uniform(-0.1, 0.1)
# 设置梯度 为 0.0
self.gradient = 0.0
def calc_gradient(self):
'''
Desc:
计算梯度
Args:
None
Returns:
None
'''
# 下游节点的 delta * 上游节点的 output 计算得到梯度
self.gradient = self.downstream_node.delta * self.upstream_node.output
def update_weight(self, rate):
'''
Desc:
根据梯度下降算法更新权重
Args:
rate --- 学习率 / 或者成为步长
Returns:
None
'''
# 调用计算梯度的函数来将梯度计算出来
self.calc_gradient()
# 使用梯度下降算法来更新权重
self.weight += rate * self.gradient
def get_gradient(self):
'''
Desc:
获取当前的梯度
Args:
None
Returns:
当前的梯度 gradient
'''
return self.gradient
def __str__(self):
'''
Desc:
将连接信息打印出来
Args:
None
Returns:
连接信息进行返回
'''
# 格式为:上游节点的层的索引+上游节点的节点索引 ---> 下游节点的层的索引+下游节点的节点索引,最后一个数是权重
return '(%u-%u) -> (%u-%u) = %f' % (
self.upstream_node.layer_index,
self.upstream_node.node_index,
self.downstream_node.layer_index,
self.downstream_node.node_index,
self.weight)
# Connections 对象,提供 Connection 集合操作。
class Connections(object):
'''
Desc:
Connections 对象,提供 Connection 集合的操作,看清楚后面有没有 s ,不要看错
'''
def __init__(self):
'''
Desc:
初始化 Connections 对象
Args:
None
Returns:
None
'''
# 初始化一个列表 list
self.connections = []
def add_connection(self, connection):
'''
Desc:
将 connection 中的节点信息 append 到 connections 中
Args:
None
Returns:
None
'''
self.connections.append(connection)
def dump(self):
'''
Desc:
将 Connections 的节点信息打印出来
Args:
None
Returns:
None
'''
for conn in self.connections:
print(conn)
# Network 对象,提供相应 API
class Network(object):
'''
Desc:
Network 类
'''
def __init__(self, layers):
'''
Desc:
初始化一个全连接神经网络
Args:
layers --- 二维数组,描述神经网络的每层节点数
Returns:
None
'''
# 初始化 connections使用的是 Connections 对象
self.connections = Connections()
# 初始化 layers
self.layers = []
# 我们的神经网络的层数
layer_count = len(layers)
# 节点数
node_count = 0
# 遍历所有的层,将每层信息添加到 layers 中去
for i in range(layer_count):
self.layers.append(Layer(i, layers[i]))
# 遍历除去输出层之外的所有层,将连接信息添加到 connections 对象中
for layer in range(layer_count - 1):
connections = [Connection(upstream_node, downstream_node) for upstream_node in self.layers[layer].nodes for downstream_node in self.layers[layer + 1].nodes[:-1]]
# 遍历 connections将 conn 添加到 connections 中
for conn in connections:
self.connections.add_connection(conn)
# 为下游节点添加上游节点为 conn
conn.downstream_node.append_upstream_connection(conn)
# 为上游节点添加下游节点为 conn
conn.upstream_node.append_downstream_connection(conn)
def train(self, labels, data_set, rate, epoch):
'''
Desc:
训练神经网络
Args:
labels --- 数组,训练样本标签,每个元素是一个样本的标签
data_set --- 二维数组,训练样本的特征数据。每行数据是一个样本的特征
rate --- 学习率
epoch --- 迭代次数
Returns:
None
'''
# 循环迭代 epoch 次
for i in range(epoch):
# 遍历每个训练样本
for d in range(len(data_set)):
# 使用此样本进行训练(一条样本进行训练)
self.train_one_sample(labels[d], data_set[d], rate)
# print 'sample %d training finished' % d
def train_one_sample(self, label, sample, rate):
'''
Desc:
内部函数,使用一个样本对网络进行训练
Args:
label --- 样本的标签
sample --- 样本的特征
rate --- 学习率
Returns:
None
'''
# 调用 Network 的 predict 方法,对这个样本进行预测
self.predict(sample)
# 计算根据此样本得到的结果的 delta
self.calc_delta(label)
# 更新权重
self.update_weight(rate)
def calc_delta(self, label):
'''
Desc:
计算每个节点的 delta
Args:
label --- 样本的真实值,也就是样本的标签
Returns:
None
'''
# 获取输出层的所有节点
output_nodes = self.layers[-1].nodes
# 遍历所有的 label
for i in range(len(label)):
# 计算输出层节点的 delta
output_nodes[i].calc_output_layer_delta(label[i])
# 这个用法就是切片的用法, [-2::-1] 就是将 layers 这个数组倒过来从没倒过来的时候的倒数第二个元素开始到翻转过来的倒数第一个数比如这样aaa = [1,2,3,4,5,6,7,8,9],bbb = aaa[-2::-1] ==> bbb = [8, 7, 6, 5, 4, 3, 2, 1]
# 实际上就是除掉输出层之外的所有层按照相反的顺序进行遍历
for layer in self.layers[-2::-1]:
# 遍历每层的所有节点
for node in layer.nodes:
# 计算隐藏层的 delta
node.calc_hidden_layer_delta()
def update_weight(self, rate):
'''
Desc:
更新每个连接的权重
Args:
rate --- 学习率
Returns:
None
'''
# 按照正常顺序遍历除了输出层的层
for layer in self.layers[:-1]:
# 遍历每层的所有节点
for node in layer.nodes:
# 遍历节点的下游节点
for conn in node.downstream:
# 根据下游节点来更新连接的权重
conn.update_weight(rate)
def calc_gradient(self):
'''
Desc:
计算每个连接的梯度
Args:
None
Returns:
None
'''
# 按照正常顺序遍历除了输出层之外的层
for layer in self.layers[:-1]:
# 遍历层中的所有节点
for node in layer.nodes:
# 遍历节点的下游节点
for conn in node.downstream:
# 计算梯度
conn.calc_gradient()
def get_gradient(self, label, sample):
'''
Desc:
获得网络在一个样本下,每个连接上的梯度
Args:
label --- 样本标签
sample --- 样本特征
Returns:
None
'''
# 调用 predict() 方法,利用样本的特征数据对样本进行预测
self.predict(sample)
# 计算 delta
self.calc_delta(label)
# 计算梯度
self.calc_gradient()
def predict(self, sample):
'''
Desc:
根据输入的样本预测输出值
Args:
sample --- 数组,样本的特征,也就是网络的输入向量
Returns:
使用我们的感知器规则计算网络的输出
'''
# 首先为输入层设置输出值output为样本的输入向量即不发生任何变化
self.layers[0].set_output(sample)
# 遍历除去输入层开始到最后一层
for i in range(1, len(self.layers)):
# 计算 output
self.layers[i].calc_output()
# 将计算得到的输出,也就是我们的预测值返回
return map(lambda node: node.output, self.layers[-1].nodes[:-1])
def dump(self):
'''
Desc:
打印出我们的网络信息
Args:
None
Returns:
None
'''
# 遍历所有的 layers
for layer in self.layers:
# 将所有的层的信息打印出来
layer.dump()
# # ------------------------- 至此,基本上我们把 我们的神经网络实现完成,下面还会介绍一下对应的梯度检查相关的算法,现在我们首先回顾一下我们上面写道的类及他们的作用 ------------------------
'''
1、节点类的实现 Node :负责记录和维护节点自身信息以及这个节点相关的上下游连接,实现输出值和误差项的计算。如下:
layer_index --- 节点所属的层的编号
node_index --- 节点的编号
downstream --- 下游节点
upstream ---- 上游节点
output ---- 节点的输出值
delta ------ 节点的误差项
2、ConstNode 类,偏置项类的实现:实现一个输出恒为 1 的节点(计算偏置项的时候会用到),如下:
layer_index --- 节点所属层的编号
node_index ---- 节点的编号
downstream ---- 下游节点
没有记录上游节点,因为一个偏置项的输出与上游节点的输出无关
output ----- 偏置项的输出
3、layer 类,负责初始化一层。作为的是 Node 节点的集合对象,提供对 Node 集合的操作。也就是说layer 包含的是 Node 的集合。
layer_index ---- 层的编号
node_count ----- 层所包含的节点的个数
def set_ouput() -- 设置层的输出,当层是输入层时会用到
def calc_output -- 计算层的输出向量,调用的 Node 类的 计算输出 方法
4、Connection 类:负责记录连接的权重,以及这个连接所关联的上下游节点,如下:
upstream_node --- 连接的上游节点
downstream_node -- 连接的下游节点
weight -------- random.uniform(-0.1, 0.1) 初始化为一个很小的随机数
gradient -------- 0.0 梯度,初始化为 0.0
def calc_gradient() --- 计算梯度,使用的是下游节点的 delta 与上游节点的 output 相乘计算得到
def get_gradient() ---- 获取当前的梯度
def update_weight() --- 根据梯度下降算法更新权重
5、Connections 类:提供对 Connection 集合操作,如下:
def add_connection() --- 添加一个 connection
6、Network 类:提供相应的 API如下
connections --- Connections 对象
layers -------- 神经网络的层
layer_count --- 神经网络的层数
node_count --- 节点个数
def train() --- 训练神经网络
def train_one_sample() --- 用一个样本训练网络
def calc_delta() --- 计算误差项
def update_weight() --- 更新每个连接权重
def calc_gradient() --- 计算每个连接的梯度
def get_gradient() --- 获得网络在一个样本下,每个连接上的梯度
def predict() --- 根据输入的样本预测输出值
'''
# #--------------------------------------回顾完成了,有些问题可能还是没有弄懂,没事,我们接着看下面---------------------------------------------
class Normalizer(object):
'''
Desc:
归一化工具类
Args:
object --- 对象
Returns:
None
'''
def __init__(self):
'''
Desc:
初始化
Args:
None
Returns:
None
'''
# 初始化 16 进制的数,用来判断位的,分别是
# 0x1 ---- 00000001
# 0x2 ---- 00000010
# 0x4 ---- 00000100
# 0x8 ---- 00001000
# 0x10 --- 00010000
# 0x20 --- 00100000
# 0x40 --- 01000000
# 0x80 --- 10000000
self.mask = [0x1, 0x2, 0x4, 0x8, 0x10, 0x20, 0x40, 0x80]
def norm(self, number):
'''
Desc:
对 number 进行规范化
Args:
number --- 要规范化的数据
Returns:
规范化之后的数据
'''
# 此方法就相当于判断一个 8 位的向量,哪一位上有数字,如果有就将这个数设置为 0.9 ,否则,设置为 0.1,通俗比较来说,就是我们这里用 0.9 表示 1用 0.1 表示 0
return map(lambda m: 0.9 if number & m else 0.1, self.mask)
def denorm(self, vec):
'''
Desc:
对我们得到的向量进行反规范化
Args:
vec --- 得到的向量
Returns:
最终的预测结果
'''
# 进行二分类,大于 0.5 就设置为 1小于 0.5 就设置为 0
binary = map(lambda i: 1 if i > 0.5 else 0, vec)
# 遍历 mask
for i in range(len(self.mask)):
binary[i] = binary[i] * self.mask[i]
# 将结果相加得到最终的预测结果
return reduce(lambda x,y: x + y, binary)
def mean_square_error(vec1, vec2):
'''
Desc:
计算平均平方误差
Args:
vec1 --- 第一个数
vec2 --- 第二个数
Returns:
返回 1/2 * (x-y)^2 计算得到的值
'''
return 0.5 * reduce(lambda a, b: a + b, map(lambda v: (v[0] - v[1]) * (v[0] - v[1]), zip(vec1, vec2)))
def gradient_check(network, sample_feature, sample_label):
'''
Desc:
梯度检查
Args:
network --- 神经网络对象
sample_feature --- 样本的特征
sample_label --- 样本的标签
Returns:
None
'''
# 计算网络误差
network_error = lambda vec1, vec2: 0.5 * reduce(lambda a, b: a + b, map(lambda v: (v[0] - v[1]) * (v[0] - v[1]), zip(vec1, vec2)))
# 获取网络在当前样本下每个连接的梯度
network.get_gradient(sample_feature, sample_label)
# 对每个权重做梯度检查
for conn in network.connections.connections:
# 获取指定连接的梯度
actual_gradient = conn.get_gradient()
# 增加一个很小的值,计算网络的误差
epsilon = 0.0001
conn.weight += epsilon
error1 = network_error(network.predict(sample_feature), sample_label)
# 减去一个很小的值,计算网络的误差
conn.weight -= 2 * epsilon # 刚才加过了一次因此这里需要减去2倍
error2 = network_error(network.predict(sample_feature), sample_label)
# 根据式6计算期望的梯度值
expected_gradient = (error2 - error1) / (2 * epsilon)
# 打印
print('expected gradient: \t%f\nactual gradient: \t%f' % (expected_gradient, actual_gradient))
def train_data_set():
'''
Desc:
获取训练数据集
Args:
None
Returns:
labels --- 训练数据集每条数据对应的标签
'''
# 调用 Normalizer() 类
normalizer = Normalizer()
# 初始化一个 list用来存储后面的数据
data_set = []
labels = []
# 0 到 256 ,其中以 8 为步长
for i in range(0, 256, 8):
# 调用 normalizer 对象的 norm 方法
n = normalizer.norm(int(random.uniform(0, 256)))
# 在 data_set 中 append n
data_set.append(n)
# 在 labels 中 append n
labels.append(n)
# 将它们返回
return labels, data_set
def train(network):
'''
Desc:
使用我们的神经网络进行训练
Args:
network --- 神经网络对象
Returns:
None
'''
# 获取训练数据集
labels, data_set = train_data_set()
# 调用 network 中的 train方法来训练我们的神经网络
network.train(labels, data_set, 0.3, 50)
def test(network, data):
'''
Desc:
对我们的全连接神经网络进行测试
Args:
network --- 神经网络对象
data ------ 测试数据集
Returns:
None
'''
# 调用 Normalizer() 类
normalizer = Normalizer()
# 调用 norm 方法,对数据进行规范化
norm_data = normalizer.norm(data)
# 对测试数据进行预测
predict_data = network.predict(norm_data)
# 将结果打印出来
print('\ttestdata(%u)\tpredict(%u)' % (data, normalizer.denorm(predict_data)))
def correct_ratio(network):
'''
Desc:
计算我们的神经网络的正确率
Args:
network --- 神经网络对象
Returns:
None
'''
normalizer = Normalizer()
correct = 0.0
for i in range(256):
if normalizer.denorm(network.predict(normalizer.norm(i))) == i:
correct += 1.0
print('correct_ratio: %.2f%%' % (correct / 256 * 100))
def gradient_check_test():
'''
Desc:
梯度检查测试
Args:
None
Returns:
None
'''
# 创建一个有 3 层的网络,每层有 2 个节点
net = Network([2, 2, 2])
# 样本的特征
sample_feature = [0.9, 0.1]
# 样本对应的标签
sample_label = [0.9, 0.1]
# 使用梯度检查来查看是否正确
gradient_check(net, sample_feature, sample_label)
if __name__ == '__main__':
'''
Desc:
主函数
Args:
None
Returns:
None
'''
# 初始化一个神经网络,输入层 8 个节点,隐藏层 3 个节点,输出层 8 个节点
net = Network([8, 3, 8])
# 训练我们的神经网络
train(net)
# 将我们的神经网络的信息打印出来
net.dump()
# 打印出神经网络的正确率
correct_ratio(net)

466
src/py2.x/dl/cnn.py Normal file
View File

@@ -0,0 +1,466 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import numpy as np
from activators import ReluActivator, IdentityActivator
# 获取卷积区域
def get_patch(input_array, i, j, filter_width,
filter_height, stride):
'''
从输入数组中获取本次卷积的区域,
自动适配输入为2D和3D的情况
'''
start_i = i * stride
start_j = j * stride
if input_array.ndim == 2:
return input_array[
start_i : start_i + filter_height,
start_j : start_j + filter_width]
elif input_array.ndim == 3:
return input_array[:,
start_i : start_i + filter_height,
start_j : start_j + filter_width]
# 获取一个2D区域的最大值所在的索引
def get_max_index(array):
max_i = 0
max_j = 0
max_value = array[0,0]
for i in range(array.shape[0]):
for j in range(array.shape[1]):
if array[i,j] > max_value:
max_value = array[i,j]
max_i, max_j = i, j
return max_i, max_j
# 计算卷积
def conv(input_array,
kernel_array,
output_array,
stride, bias):
'''
计算卷积自动适配输入为2D和3D的情况
conv函数实现了2维和3维数组的卷积
'''
channel_number = input_array.ndim
output_width = output_array.shape[1]
output_height = output_array.shape[0]
kernel_width = kernel_array.shape[-1]
kernel_height = kernel_array.shape[-2]
for i in range(output_height):
for j in range(output_width):
output_array[i][j] = (
get_patch(input_array, i, j, kernel_width,
kernel_height, stride) * kernel_array
).sum() + bias
# 为数组增加Zero padding
def padding(input_array, zp):
'''
为数组增加Zero padding自动适配输入为2D和3D的情况
'''
if zp == 0:
return input_array
else:
if input_array.ndim == 3:
input_width = input_array.shape[2]
input_height = input_array.shape[1]
input_depth = input_array.shape[0]
padded_array = np.zeros((
input_depth,
input_height + 2 * zp,
input_width + 2 * zp))
padded_array[:,
zp : zp + input_height,
zp : zp + input_width] = input_array
return padded_array
elif input_array.ndim == 2:
input_width = input_array.shape[1]
input_height = input_array.shape[0]
padded_array = np.zeros((
input_height + 2 * zp,
input_width + 2 * zp))
padded_array[zp : zp + input_height,
zp : zp + input_width] = input_array
return padded_array
# 对numpy数组进行element wise操作
def element_wise_op(array, op):
'''
Desc:
element_wise_op函数实现了对numpy数组进行按元素操作并将返回值写回到数组中
'''
for i in np.nditer(array,
op_flags=['readwrite']):
i[...] = op(i)
class Filter(object):
'''
Desc:
Filter类保存了卷积层的参数以及梯度并且实现了用梯度下降算法来更新参数。
我们对参数的初始化采用了常用的策略权重随机初始化为一个很小的值而偏置项初始化为0。
'''
def __init__(self, width, height, depth):
self.weights = np.random.uniform(-1e-4, 1e-4,
(depth, height, width))
self.bias = 0
self.weights_grad = np.zeros(
self.weights.shape)
self.bias_grad = 0
def __repr__(self):
return 'filter weights:\n%s\nbias:\n%s' % (
repr(self.weights), repr(self.bias))
def get_weights(self):
return self.weights
def get_bias(self):
return self.bias
def update(self, learning_rate):
self.weights -= learning_rate * self.weights_grad
self.bias -= learning_rate * self.bias_grad
class ConvLayer(object):
'''
Desc:
用ConvLayer类来实现一个卷积层。下面的代码是初始化一个卷积层可以在构造函数中设置卷积层的超参数。
'''
def __init__(self, input_width, input_height,
channel_number, filter_width,
filter_height, filter_number,
zero_padding, stride, activator,
learning_rate):
self.input_width = input_width
self.input_height = input_height
self.channel_number = channel_number
self.filter_width = filter_width
self.filter_height = filter_height
self.filter_number = filter_number
self.zero_padding = zero_padding
self.stride = stride
self.output_width = \
ConvLayer.calculate_output_size(
self.input_width, filter_width, zero_padding,
stride)
self.output_height = \
ConvLayer.calculate_output_size(
self.input_height, filter_height, zero_padding,
stride)
self.output_array = np.zeros((self.filter_number,
self.output_height, self.output_width))
self.filters = []
for i in range(filter_number):
self.filters.append(Filter(filter_width,
filter_height, self.channel_number))
self.activator = activator
self.learning_rate = learning_rate
def forward(self, input_array):
'''
Desc:
计算卷积层的输出,输出结果保存在 self.output_array
ConvLayer 类的 forward 方法实现了卷积层的前向计算(即计算根据输入来计算卷积层的输出)
'''
self.input_array = input_array
self.padded_input_array = padding(input_array,
self.zero_padding)
for f in range(self.filter_number):
filter = self.filters[f]
conv(self.padded_input_array,
filter.get_weights(), self.output_array[f],
self.stride, filter.get_bias())
element_wise_op(self.output_array,
self.activator.forward)
def backward(self, input_array, sensitivity_array,
activator):
'''
计算传递给前一层的误差项,以及计算每个权重的梯度
前一层的误差项保存在self.delta_array
梯度保存在Filter对象的weights_grad
'''
self.forward(input_array)
self.bp_sensitivity_map(sensitivity_array,
activator)
self.bp_gradient(sensitivity_array)
def update(self):
'''
按照梯度下降,更新权重
'''
for filter in self.filters:
filter.update(self.learning_rate)
def bp_sensitivity_map(self, sensitivity_array,
activator):
'''
计算传递到上一层的sensitivity map
sensitivity_array: 本层的sensitivity map
activator: 上一层的激活函数
'''
# 处理卷积步长对原始sensitivity map进行扩展
expanded_array = self.expand_sensitivity_map(
sensitivity_array)
# full卷积对sensitivitiy map进行zero padding
# 虽然原始输入的zero padding单元也会获得残差
# 但这个残差不需要继续向上传递,因此就不计算了
expanded_width = expanded_array.shape[2]
zp = (self.input_width +
self.filter_width - 1 - expanded_width) / 2
padded_array = padding(expanded_array, zp)
# 初始化delta_array用于保存传递到上一层的
# sensitivity map
self.delta_array = self.create_delta_array()
# 对于具有多个filter的卷积层来说最终传递到上一层的
# sensitivity map相当于所有的filter的
# sensitivity map之和
for f in range(self.filter_number):
filter = self.filters[f]
# 将filter权重翻转180度
flipped_weights = np.array(map(
lambda i: np.rot90(i, 2),
filter.get_weights()))
# 计算与一个filter对应的delta_array
delta_array = self.create_delta_array()
for d in range(delta_array.shape[0]):
conv(padded_array[f], flipped_weights[d],
delta_array[d], 1, 0)
self.delta_array += delta_array
# 将计算结果与激活函数的偏导数做element-wise乘法操作
derivative_array = np.array(self.input_array)
element_wise_op(derivative_array,
activator.backward)
self.delta_array *= derivative_array
def bp_gradient(self, sensitivity_array):
# 处理卷积步长对原始sensitivity map进行扩展
expanded_array = self.expand_sensitivity_map(
sensitivity_array)
for f in range(self.filter_number):
# 计算每个权重的梯度
filter = self.filters[f]
for d in range(filter.weights.shape[0]):
conv(self.padded_input_array[d],
expanded_array[f],
filter.weights_grad[d], 1, 0)
# 计算偏置项的梯度
filter.bias_grad = expanded_array[f].sum()
def expand_sensitivity_map(self, sensitivity_array):
depth = sensitivity_array.shape[0]
# 确定扩展后sensitivity map的大小
# 计算stride为1时sensitivity map的大小
expanded_width = (self.input_width -
self.filter_width + 2 * self.zero_padding + 1)
expanded_height = (self.input_height -
self.filter_height + 2 * self.zero_padding + 1)
# 构建新的sensitivity_map
expand_array = np.zeros((depth, expanded_height,
expanded_width))
# 从原始sensitivity map拷贝误差值
for i in range(self.output_height):
for j in range(self.output_width):
i_pos = i * self.stride
j_pos = j * self.stride
expand_array[:,i_pos,j_pos] = \
sensitivity_array[:,i,j]
return expand_array
def create_delta_array(self):
return np.zeros((self.channel_number,
self.input_height, self.input_width))
@staticmethod
def calculate_output_size(input_size, filter_size, zero_padding, stride):
'''
Desc:
用来确定卷积层输出的大小
'''
return (input_size - filter_size +
2 * zero_padding) / stride + 1
class MaxPoolingLayer(object):
def __init__(self, input_width, input_height,
channel_number, filter_width,
filter_height, stride):
self.input_width = input_width
self.input_height = input_height
self.channel_number = channel_number
self.filter_width = filter_width
self.filter_height = filter_height
self.stride = stride
self.output_width = (input_width -
filter_width) / self.stride + 1
self.output_height = (input_height -
filter_height) / self.stride + 1
self.output_array = np.zeros((self.channel_number,
self.output_height, self.output_width))
def forward(self, input_array):
for d in range(self.channel_number):
for i in range(self.output_height):
for j in range(self.output_width):
self.output_array[d,i,j] = (
get_patch(input_array[d], i, j,
self.filter_width,
self.filter_height,
self.stride).max())
def backward(self, input_array, sensitivity_array):
self.delta_array = np.zeros(input_array.shape)
for d in range(self.channel_number):
for i in range(self.output_height):
for j in range(self.output_width):
patch_array = get_patch(
input_array[d], i, j,
self.filter_width,
self.filter_height,
self.stride)
k, l = get_max_index(patch_array)
self.delta_array[d,
i * self.stride + k,
j * self.stride + l] = \
sensitivity_array[d,i,j]
def init_test():
a = np.array(
[[[0,1,1,0,2],
[2,2,2,2,1],
[1,0,0,2,0],
[0,1,1,0,0],
[1,2,0,0,2]],
[[1,0,2,2,0],
[0,0,0,2,0],
[1,2,1,2,1],
[1,0,0,0,0],
[1,2,1,1,1]],
[[2,1,2,0,0],
[1,0,0,1,0],
[0,2,1,0,1],
[0,1,2,2,2],
[2,1,0,0,1]]])
b = np.array(
[[[0,1,1],
[2,2,2],
[1,0,0]],
[[1,0,2],
[0,0,0],
[1,2,1]]])
cl = ConvLayer(5,5,3,3,3,2,1,2,IdentityActivator(),0.001)
cl.filters[0].weights = np.array(
[[[-1,1,0],
[0,1,0],
[0,1,1]],
[[-1,-1,0],
[0,0,0],
[0,-1,0]],
[[0,0,-1],
[0,1,0],
[1,-1,-1]]], dtype=np.float64)
cl.filters[0].bias=1
cl.filters[1].weights = np.array(
[[[1,1,-1],
[-1,-1,1],
[0,-1,1]],
[[0,1,0],
[-1,0,-1],
[-1,1,0]],
[[-1,0,0],
[-1,0,1],
[-1,0,0]]], dtype=np.float64)
return a, b, cl
def test():
a, b, cl = init_test()
cl.forward(a)
print(cl.output_array)
def test_bp():
a, b, cl = init_test()
cl.backward(a, b, IdentityActivator())
cl.update()
print(cl.filters[0])
print(cl.filters[1])
def gradient_check():
'''
梯度检查
'''
# 设计一个误差函数,取所有节点输出项之和
error_function = lambda o: o.sum()
# 计算forward值
a, b, cl = init_test()
cl.forward(a)
# 求取sensitivity map
sensitivity_array = np.ones(cl.output_array.shape,
dtype=np.float64)
# 计算梯度
cl.backward(a, sensitivity_array,
IdentityActivator())
# 检查梯度
epsilon = 10e-4
for d in range(cl.filters[0].weights_grad.shape[0]):
for i in range(cl.filters[0].weights_grad.shape[1]):
for j in range(cl.filters[0].weights_grad.shape[2]):
cl.filters[0].weights[d,i,j] += epsilon
cl.forward(a)
err1 = error_function(cl.output_array)
cl.filters[0].weights[d,i,j] -= 2*epsilon
cl.forward(a)
err2 = error_function(cl.output_array)
expect_grad = (err1 - err2) / (2 * epsilon)
cl.filters[0].weights[d,i,j] += epsilon
print('weights(%d,%d,%d): expected - actural %f - %f' % (
d, i, j, expect_grad, cl.filters[0].weights_grad[d,i,j]))
def init_pool_test():
a = np.array(
[[[1,1,2,4],
[5,6,7,8],
[3,2,1,0],
[1,2,3,4]],
[[0,1,2,3],
[4,5,6,7],
[8,9,0,1],
[3,4,5,6]]], dtype=np.float64)
b = np.array(
[[[1,2],
[2,4]],
[[3,5],
[8,2]]], dtype=np.float64)
mpl = MaxPoolingLayer(4,4,2,2,2,2)
return a, b, mpl
def test_pool():
a, b, mpl = init_pool_test()
mpl.forward(a)
print('input array:\n%s\noutput array:\n%s' % (a, mpl.output_array))
def test_pool_bp():
a, b, mpl = init_pool_test()
mpl.backward(a, b)
print('input array:\n%s\nsensitivity array:\n%s\ndelta array:\n%s' % (a, b, mpl.delta_array))

232
src/py2.x/dl/fc.py Normal file
View File

@@ -0,0 +1,232 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import random
import numpy as np
from activators import SigmoidActivator, IdentityActivator
try:
reduce # Python 2
except NameError: # Python 3
from functools import reduce
# 全连接层实现类
class FullConnectedLayer(object):
def __init__(self, input_size, output_size,
activator):
'''
构造函数
input_size: 本层输入向量的维度
output_size: 本层输出向量的维度
activator: 激活函数
'''
self.input_size = input_size
self.output_size = output_size
self.activator = activator
# 权重数组W
self.W = np.random.uniform(-0.1, 0.1,
(output_size, input_size))
# 偏置项b
self.b = np.zeros((output_size, 1))
# 输出向量
self.output = np.zeros((output_size, 1))
def forward(self, input_array):
'''
前向计算
input_array: 输入向量维度必须等于input_size
'''
# 式2
self.input = input_array
self.output = self.activator.forward(
np.dot(self.W, input_array) + self.b)
def backward(self, delta_array):
'''
反向计算W和b的梯度
delta_array: 从上一层传递过来的误差项
'''
# 式8
self.delta = self.activator.backward(self.input) * np.dot(
self.W.T, delta_array)
self.W_grad = np.dot(delta_array, self.input.T)
self.b_grad = delta_array
def update(self, learning_rate):
'''
使用梯度下降算法更新权重
'''
self.W += learning_rate * self.W_grad
self.b += learning_rate * self.b_grad
def dump(self):
print('W: %s\nb:%s' % (self.W, self.b))
# 神经网络类
class Network(object):
def __init__(self, layers):
'''
构造函数
'''
self.layers = []
for i in range(len(layers) - 1):
self.layers.append(
FullConnectedLayer(
layers[i], layers[i+1],
SigmoidActivator()
)
)
def predict(self, sample):
'''
使用神经网络实现预测
sample: 输入样本
'''
output = sample
for layer in self.layers:
layer.forward(output)
output = layer.output
return output
def train(self, labels, data_set, rate, epoch):
'''
训练函数
labels: 样本标签
data_set: 输入样本
rate: 学习速率
epoch: 训练轮数
'''
for i in range(epoch):
for d in range(len(data_set)):
self.train_one_sample(labels[d],
data_set[d], rate)
def train_one_sample(self, label, sample, rate):
self.predict(sample)
self.calc_gradient(label)
self.update_weight(rate)
def calc_gradient(self, label):
delta = self.layers[-1].activator.backward(
self.layers[-1].output
) * (label - self.layers[-1].output)
for layer in self.layers[::-1]:
layer.backward(delta)
delta = layer.delta
return delta
def update_weight(self, rate):
for layer in self.layers:
layer.update(rate)
def dump(self):
for layer in self.layers:
layer.dump()
def loss(self, output, label):
return 0.5 * ((label - output) * (label - output)).sum()
def gradient_check(self, sample_feature, sample_label):
'''
梯度检查
network: 神经网络对象
sample_feature: 样本的特征
sample_label: 样本的标签
'''
# 获取网络在当前样本下每个连接的梯度
self.predict(sample_feature)
self.calc_gradient(sample_label)
# 检查梯度
epsilon = 10e-4
for fc in self.layers:
for i in range(fc.W.shape[0]):
for j in range(fc.W.shape[1]):
fc.W[i,j] += epsilon
output = self.predict(sample_feature)
err1 = self.loss(sample_label, output)
fc.W[i,j] -= 2*epsilon
output = self.predict(sample_feature)
err2 = self.loss(sample_label, output)
expect_grad = (err1 - err2) / (2 * epsilon)
fc.W[i,j] += epsilon
print('weights(%d,%d): expected - actural %.4e - %.4e' % (
i, j, expect_grad, fc.W_grad[i,j]))
from bp import train_data_set
def transpose(args):
return map(
lambda arg: map(
lambda line: np.array(line).reshape(len(line), 1)
, arg)
, args
)
class Normalizer(object):
def __init__(self):
self.mask = [
0x1, 0x2, 0x4, 0x8, 0x10, 0x20, 0x40, 0x80
]
def norm(self, number):
data = map(lambda m: 0.9 if number & m else 0.1, self.mask)
return np.array(data).reshape(8, 1)
def denorm(self, vec):
binary = map(lambda i: 1 if i > 0.5 else 0, vec[:,0])
for i in range(len(self.mask)):
binary[i] = binary[i] * self.mask[i]
return reduce(lambda x,y: x + y, binary)
def train_data_set():
normalizer = Normalizer()
data_set = []
labels = []
for i in range(0, 256):
n = normalizer.norm(i)
data_set.append(n)
labels.append(n)
return labels, data_set
def correct_ratio(network):
normalizer = Normalizer()
correct = 0.0;
for i in range(256):
if normalizer.denorm(network.predict(normalizer.norm(i))) == i:
correct += 1.0
print('correct_ratio: %.2f%%' % (correct / 256 * 100))
def test():
labels, data_set = transpose(train_data_set())
net = Network([8, 3, 8])
rate = 0.5
mini_batch = 20
epoch = 10
for i in range(epoch):
net.train(labels, data_set, rate, mini_batch)
print('after epoch %d loss: %f' % (
(i + 1),
net.loss(labels[-1], net.predict(data_set[-1]))
))
rate /= 2
correct_ratio(net)
def gradient_check():
'''
梯度检查
'''
labels, data_set = transpose(train_data_set())
net = Network([8, 3, 8])
net.gradient_check(data_set[0], labels[0])
return net

122
src/py2.x/dl/linear_unit.py Normal file
View File

@@ -0,0 +1,122 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# 引入 Perceptron 类
from __future__ import print_function
from perceptron import Perceptron
# 定义激活函数 f
f = lambda x: x
class LinearUnit(Perceptron):
'''
Desc:
线性单元类
Args:
Perceptron —— 感知器
Returns:
None
'''
def __init__(self, input_num):
'''
Desc:
初始化线性单元,设置输入参数的个数
Args:
input_num —— 输入参数的个数
Returns:
None
'''
# 初始化我们的感知器类,设置输入参数的个数 input_num 和 激活函数 f
Perceptron.__init__(self, input_num, f)
# 构造简单的数据集
def get_training_dataset():
'''
Desc:
构建一个简单的训练数据集
Args:
None
Returns:
input_vecs —— 训练数据集的特征部分
labels —— 训练数据集的数据对应的标签,是一一对应的
'''
# 构建数据集,输入向量列表,每一项是工作年限
input_vecs = [[5], [3], [8], [1.4], [10.1]]
# 期望的输出列表,也就是输入向量的对应的标签,与工作年限对应的收入年薪
labels = [5500, 2300, 7600, 1800, 11400]
return input_vecs, labels
# 使用我们的训练数据集对线性单元进行训练
def train_linear_unit():
'''
Desc:
使用训练数据集对我们的线性单元进行训练
Args:
None
Returns:
lu —— 返回训练好的线性单元
'''
# 创建感知器对象,输入参数的个数也就是特征数为 1工作年限
lu = LinearUnit(1)
# 获取构建的数据集
input_vecs, labels = get_training_dataset()
# 训练感知器,迭代 10 轮,学习率为 0.01
lu.train(input_vecs, labels, 10, 0.01)
# 返回训练好的线性单元
return lu
# 将图像画出来
def plot(linear_unit):
'''
Desc:
将我们训练好的线性单元对数据的分类情况作图画出来
Args:
linear_unit —— 训练好的线性单元
Returns:
None
'''
# 引入绘图的库
import matplotlib.pyplot as plt
# 获取训练数据:特征 input_vecs 与 对应的标签 labels
input_vecs, labels = get_training_dataset()
# figure() 创建一个 Figure 对象,与用户交互的整个窗口,这个 figure 中容纳着 subplots
fig = plt.figure()
# 在 figure 对象中创建 1行1列中的第一个图
ax = fig.add_subplot(111)
# scatter(x, y) 绘制散点图,其中的 x,y 是相同长度的数组序列
ax.scatter(map(lambda x: x[0], input_vecs), labels)
# 设置权重
weights = linear_unit.weights
# 设置偏置项
bias = linear_unit.bias
# range(start, stop, step) 从 start 开始,到 stop 结束,步长为 step
x = range(0, 12, 1)
# 计算感知器对输入计算得到的值
y = map(lambda x: weights[0] * x + bias, x)
# 将图画出来
ax.plot(x, y)
# 将最终的图展示出来
plt.show()
if __name__ == '__main__':
'''
Desc:
main 函数,训练我们的线性单元,并进行预测
Args:
None
Returns:
None
'''
# 首先训练我们的线性单元
linear_unit = train_linear_unit()
# 打印训练获得的权重 和 偏置
print(linear_unit)
# 测试
print('Work 3.4 years, monthly salary = %.2f' % linear_unit.predict([3.4]))
print('Work 15 years, monthly salary = %.2f' % linear_unit.predict([15]))
print('Work 1.5 years, monthly salary = %.2f' % linear_unit.predict([1.5]))
print('Work 6.3 years, monthly salary = %.2f' % linear_unit.predict([6.3]))
plot(linear_unit)

334
src/py2.x/dl/lstm.py Normal file
View File

@@ -0,0 +1,334 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import matplotlib.pyplot as plt
import numpy as np
from cnn import element_wise_op
from activators import SigmoidActivator, TanhActivator, IdentityActivator
class LstmLayer(object):
def __init__(self, input_width, state_width,
learning_rate):
self.input_width = input_width
self.state_width = state_width
self.learning_rate = learning_rate
# 门的激活函数
self.gate_activator = SigmoidActivator()
# 输出的激活函数
self.output_activator = TanhActivator()
# 当前时刻初始化为t0
self.times = 0
# 各个时刻的单元状态向量c
self.c_list = self.init_state_vec()
# 各个时刻的输出向量h
self.h_list = self.init_state_vec()
# 各个时刻的遗忘门f
self.f_list = self.init_state_vec()
# 各个时刻的输入门i
self.i_list = self.init_state_vec()
# 各个时刻的输出门o
self.o_list = self.init_state_vec()
# 各个时刻的即时状态c~
self.ct_list = self.init_state_vec()
# 遗忘门权重矩阵Wfh, Wfx, 偏置项bf
self.Wfh, self.Wfx, self.bf = (
self.init_weight_mat())
# 输入门权重矩阵Wfh, Wfx, 偏置项bf
self.Wih, self.Wix, self.bi = (
self.init_weight_mat())
# 输出门权重矩阵Wfh, Wfx, 偏置项bf
self.Woh, self.Wox, self.bo = (
self.init_weight_mat())
# 单元状态权重矩阵Wfh, Wfx, 偏置项bf
self.Wch, self.Wcx, self.bc = (
self.init_weight_mat())
def init_state_vec(self):
'''
初始化保存状态的向量
'''
state_vec_list = []
state_vec_list.append(np.zeros(
(self.state_width, 1)))
return state_vec_list
def init_weight_mat(self):
'''
初始化权重矩阵
'''
Wh = np.random.uniform(-1e-4, 1e-4,
(self.state_width, self.state_width))
Wx = np.random.uniform(-1e-4, 1e-4,
(self.state_width, self.input_width))
b = np.zeros((self.state_width, 1))
return Wh, Wx, b
def forward(self, x):
'''
根据式1-式6进行前向计算
'''
self.times += 1
# 遗忘门
fg = self.calc_gate(x, self.Wfx, self.Wfh,
self.bf, self.gate_activator)
self.f_list.append(fg)
# 输入门
ig = self.calc_gate(x, self.Wix, self.Wih,
self.bi, self.gate_activator)
self.i_list.append(ig)
# 输出门
og = self.calc_gate(x, self.Wox, self.Woh,
self.bo, self.gate_activator)
self.o_list.append(og)
# 即时状态
ct = self.calc_gate(x, self.Wcx, self.Wch,
self.bc, self.output_activator)
self.ct_list.append(ct)
# 单元状态
c = fg * self.c_list[self.times - 1] + ig * ct
self.c_list.append(c)
# 输出
h = og * self.output_activator.forward(c)
self.h_list.append(h)
def calc_gate(self, x, Wx, Wh, b, activator):
'''
计算门
'''
h = self.h_list[self.times - 1] # 上次的LSTM输出
net = np.dot(Wh, h) + np.dot(Wx, x) + b
gate = activator.forward(net)
return gate
def backward(self, x, delta_h, activator):
'''
实现LSTM训练算法
'''
self.calc_delta(delta_h, activator)
self.calc_gradient(x)
def update(self):
'''
按照梯度下降,更新权重
'''
self.Wfh -= self.learning_rate * self.Whf_grad
self.Wfx -= self.learning_rate * self.Whx_grad
self.bf -= self.learning_rate * self.bf_grad
self.Wih -= self.learning_rate * self.Whi_grad
self.Wix -= self.learning_rate * self.Whi_grad
self.bi -= self.learning_rate * self.bi_grad
self.Woh -= self.learning_rate * self.Wof_grad
self.Wox -= self.learning_rate * self.Wox_grad
self.bo -= self.learning_rate * self.bo_grad
self.Wch -= self.learning_rate * self.Wcf_grad
self.Wcx -= self.learning_rate * self.Wcx_grad
self.bc -= self.learning_rate * self.bc_grad
def calc_delta(self, delta_h, activator):
# 初始化各个时刻的误差项
self.delta_h_list = self.init_delta() # 输出误差项
self.delta_o_list = self.init_delta() # 输出门误差项
self.delta_i_list = self.init_delta() # 输入门误差项
self.delta_f_list = self.init_delta() # 遗忘门误差项
self.delta_ct_list = self.init_delta() # 即时输出误差项
# 保存从上一层传递下来的当前时刻的误差项
self.delta_h_list[-1] = delta_h
# 迭代计算每个时刻的误差项
for k in range(self.times, 0, -1):
self.calc_delta_k(k)
def init_delta(self):
'''
初始化误差项
'''
delta_list = []
for i in range(self.times + 1):
delta_list.append(np.zeros(
(self.state_width, 1)))
return delta_list
def calc_delta_k(self, k):
'''
根据k时刻的delta_h计算k时刻的delta_f、
delta_i、delta_o、delta_ct以及k-1时刻的delta_h
'''
# 获得k时刻前向计算的值
ig = self.i_list[k]
og = self.o_list[k]
fg = self.f_list[k]
ct = self.ct_list[k]
c = self.c_list[k]
c_prev = self.c_list[k-1]
tanh_c = self.output_activator.forward(c)
delta_k = self.delta_h_list[k]
# 根据式9计算delta_o
delta_o = (delta_k * tanh_c *
self.gate_activator.backward(og))
delta_f = (delta_k * og *
(1 - tanh_c * tanh_c) * c_prev *
self.gate_activator.backward(fg))
delta_i = (delta_k * og *
(1 - tanh_c * tanh_c) * ct *
self.gate_activator.backward(ig))
delta_ct = (delta_k * og *
(1 - tanh_c * tanh_c) * ig *
self.output_activator.backward(ct))
delta_h_prev = (
np.dot(delta_o.transpose(), self.Woh) +
np.dot(delta_i.transpose(), self.Wih) +
np.dot(delta_f.transpose(), self.Wfh) +
np.dot(delta_ct.transpose(), self.Wch)
).transpose()
# 保存全部delta值
self.delta_h_list[k-1] = delta_h_prev
self.delta_f_list[k] = delta_f
self.delta_i_list[k] = delta_i
self.delta_o_list[k] = delta_o
self.delta_ct_list[k] = delta_ct
def calc_gradient(self, x):
# 初始化遗忘门权重梯度矩阵和偏置项
self.Wfh_grad, self.Wfx_grad, self.bf_grad = (
self.init_weight_gradient_mat())
# 初始化输入门权重梯度矩阵和偏置项
self.Wih_grad, self.Wix_grad, self.bi_grad = (
self.init_weight_gradient_mat())
# 初始化输出门权重梯度矩阵和偏置项
self.Woh_grad, self.Wox_grad, self.bo_grad = (
self.init_weight_gradient_mat())
# 初始化单元状态权重梯度矩阵和偏置项
self.Wch_grad, self.Wcx_grad, self.bc_grad = (
self.init_weight_gradient_mat())
# 计算对上一次输出h的权重梯度
for t in range(self.times, 0, -1):
# 计算各个时刻的梯度
(Wfh_grad, bf_grad,
Wih_grad, bi_grad,
Woh_grad, bo_grad,
Wch_grad, bc_grad) = (
self.calc_gradient_t(t))
# 实际梯度是各时刻梯度之和
self.Wfh_grad += Wfh_grad
self.bf_grad += bf_grad
self.Wih_grad += Wih_grad
self.bi_grad += bi_grad
self.Woh_grad += Woh_grad
self.bo_grad += bo_grad
self.Wch_grad += Wch_grad
self.bc_grad += bc_grad
# 计算对本次输入x的权重梯度
xt = x.transpose()
self.Wfx_grad = np.dot(self.delta_f_list[-1], xt)
self.Wix_grad = np.dot(self.delta_i_list[-1], xt)
self.Wox_grad = np.dot(self.delta_o_list[-1], xt)
self.Wcx_grad = np.dot(self.delta_ct_list[-1], xt)
def init_weight_gradient_mat(self):
'''
初始化权重矩阵
'''
Wh_grad = np.zeros((self.state_width,
self.state_width))
Wx_grad = np.zeros((self.state_width,
self.input_width))
b_grad = np.zeros((self.state_width, 1))
return Wh_grad, Wx_grad, b_grad
def calc_gradient_t(self, t):
'''
计算每个时刻t权重的梯度
'''
h_prev = self.h_list[t-1].transpose()
Wfh_grad = np.dot(self.delta_f_list[t], h_prev)
bf_grad = self.delta_f_list[t]
Wih_grad = np.dot(self.delta_i_list[t], h_prev)
bi_grad = self.delta_f_list[t]
Woh_grad = np.dot(self.delta_o_list[t], h_prev)
bo_grad = self.delta_f_list[t]
Wch_grad = np.dot(self.delta_ct_list[t], h_prev)
bc_grad = self.delta_ct_list[t]
return Wfh_grad, bf_grad, Wih_grad, bi_grad, \
Woh_grad, bo_grad, Wch_grad, bc_grad
def reset_state(self):
# 当前时刻初始化为t0
self.times = 0
# 各个时刻的单元状态向量c
self.c_list = self.init_state_vec()
# 各个时刻的输出向量h
self.h_list = self.init_state_vec()
# 各个时刻的遗忘门f
self.f_list = self.init_state_vec()
# 各个时刻的输入门i
self.i_list = self.init_state_vec()
# 各个时刻的输出门o
self.o_list = self.init_state_vec()
# 各个时刻的即时状态c~
self.ct_list = self.init_state_vec()
def data_set():
x = [np.array([[1], [2], [3]]),
np.array([[2], [3], [4]])]
d = np.array([[1], [2]])
return x, d
def gradient_check():
'''
梯度检查
'''
# 设计一个误差函数,取所有节点输出项之和
error_function = lambda o: o.sum()
lstm = LstmLayer(3, 2, 1e-3)
# 计算forward值
x, d = data_set()
lstm.forward(x[0])
lstm.forward(x[1])
# 求取sensitivity map
sensitivity_array = np.ones(lstm.h_list[-1].shape,
dtype=np.float64)
# 计算梯度
lstm.backward(x[1], sensitivity_array, IdentityActivator())
# 检查梯度
epsilon = 10e-4
for i in range(lstm.Wfh.shape[0]):
for j in range(lstm.Wfh.shape[1]):
lstm.Wfh[i,j] += epsilon
lstm.reset_state()
lstm.forward(x[0])
lstm.forward(x[1])
err1 = error_function(lstm.h_list[-1])
lstm.Wfh[i,j] -= 2*epsilon
lstm.reset_state()
lstm.forward(x[0])
lstm.forward(x[1])
err2 = error_function(lstm.h_list[-1])
expect_grad = (err1 - err2) / (2 * epsilon)
lstm.Wfh[i,j] += epsilon
print('weights(%d,%d): expected - actural %.4e - %.4e' % (
i, j, expect_grad, lstm.Wfh_grad[i,j]))
return lstm
def test():
l = LstmLayer(3, 2, 1e-3)
x, d = data_set()
l.forward(x[0])
l.forward(x[1])
l.backward(x[1], d, IdentityActivator())
return l

177
src/py2.x/dl/mnist.py Normal file
View File

@@ -0,0 +1,177 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import struct
from fc import *
from datetime import datetime
# 数据加载器基类
class Loader(object):
def __init__(self, path, count):
'''
初始化加载器
path: 数据文件路径
count: 文件中的样本个数
'''
self.path = path
self.count = count
def get_file_content(self):
'''
读取文件内容
'''
f = open(self.path, 'rb')
content = f.read()
f.close()
return content
def to_int(self, byte):
'''
将unsigned byte字符转换为整数
'''
return struct.unpack('B', byte)[0]
# 图像数据加载器
class ImageLoader(Loader):
def get_picture(self, content, index):
'''
内部函数,从文件中获取图像
'''
start = index * 28 * 28 + 16
picture = []
for i in range(28):
picture.append([])
for j in range(28):
picture[i].append(
self.to_int(content[start + i * 28 + j]))
return picture
def get_one_sample(self, picture):
'''
内部函数,将图像转化为样本的输入向量
'''
sample = []
for i in range(28):
for j in range(28):
sample.append(picture[i][j])
return sample
def load(self):
'''
加载数据文件,获得全部样本的输入向量
'''
content = self.get_file_content()
data_set = []
for index in range(self.count):
data_set.append(
self.get_one_sample(
self.get_picture(content, index)))
return data_set
# 标签数据加载器
class LabelLoader(Loader):
def load(self):
'''
加载数据文件,获得全部样本的标签向量
'''
content = self.get_file_content()
labels = []
for index in range(self.count):
labels.append(self.norm(content[index + 8]))
return labels
def norm(self, label):
'''
内部函数将一个值转换为10维标签向量
'''
label_vec = []
label_value = self.to_int(label)
for i in range(10):
if i == label_value:
label_vec.append(0.9)
else:
label_vec.append(0.1)
return label_vec
def get_training_data_set():
'''
获得训练数据集
'''
image_loader = ImageLoader('train-images-idx3-ubyte', 60000)
label_loader = LabelLoader('train-labels-idx1-ubyte', 60000)
return image_loader.load(), label_loader.load()
def get_test_data_set():
'''
获得测试数据集
'''
image_loader = ImageLoader('t10k-images-idx3-ubyte', 10000)
label_loader = LabelLoader('t10k-labels-idx1-ubyte', 10000)
return image_loader.load(), label_loader.load()
def show(sample):
str = ''
for i in range(28):
for j in range(28):
if sample[i*28+j] != 0:
str += '*'
else:
str += ' '
str += '\n'
print(str)
def get_result(vec):
max_value_index = 0
max_value = 0
for i in range(len(vec)):
if vec[i] > max_value:
max_value = vec[i]
max_value_index = i
return max_value_index
def evaluate(network, test_data_set, test_labels):
error = 0
total = len(test_data_set)
for i in range(total):
label = get_result(test_labels[i])
predict = get_result(network.predict(test_data_set[i]))
if label != predict:
error += 1
return float(error) / float(total)
def now():
return datetime.now().strftime('%c')
def train_and_evaluate():
last_error_ratio = 1.0
epoch = 0
train_data_set, train_labels = transpose(get_training_data_set())
test_data_set, test_labels = transpose(get_test_data_set())
network = Network([784, 100, 10])
while True:
epoch += 1
network.train(train_labels, train_data_set, 0.01, 1)
print('%s epoch %d finished, loss %f' % (now(), epoch,
network.loss(train_labels[-1], network.predict(train_data_set[-1]))))
if epoch % 2 == 0:
error_ratio = evaluate(network, test_data_set, test_labels)
print('%s after epoch %d, error ratio is %f' % (now(), epoch, error_ratio))
if error_ratio > last_error_ratio:
break
else:
last_error_ratio = error_ratio
if __name__ == '__main__':
train_and_evaluate()

187
src/py2.x/dl/perceptron.py Normal file
View File

@@ -0,0 +1,187 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# 神经元 / 感知器
from __future__ import print_function
class Perceptron():
'''
Desc:
感知器类
Args:
None
Returns:
None
'''
def __init__(self, input_num, activator):
'''
Desc:
初始化感知器
Args:
input_num —— 输入参数的个数
activator —— 激活函数
Returns:
None
'''
# 设置的激活函数
self.activator = activator
# 权重向量初始化为 0
self.weights = [0.0 for _ in range(input_num)]
# 偏置项初始化为 0
self.bias = 0.0
def __str__(self):
'''
Desc:
将感知器信息打印出来
Args:
None
Returns:
None
'''
return('weights\t:%s\nbias\t:%f\n' % (self.weights, self.bias))
def predict(self, input_vec):
'''
Desc:
输入向量,输出感知器的计算结果
Args:
input_vec —— 输入向量
Returns:
感知器的计算结果
'''
# 将输入向量的计算结果返回
# 调用 激活函数 activator ,将输入向量输入,计算感知器的结果
# reduce() 函数是 python 2 的内置函数,从 python 3 开始移到了 functools 模块
# reduce() 从左到右对一个序列的项累计地应用有两个参数的函数,以此合并序列到一个单一值,例如 reduce(lambda x,y: x+y, [1,2,3,4,5]) 计算的就是 ((((1+2)+3)+4)+5)
# map() 接收一个函数 f 和一个 list ,并通过把函数 f 依次作用在 list 的每个元素上,得到一个新的 list 返回。比如我们的 f 函数是计算平方, map(f, [1,2,3,4,5]) ===> 返回 [1,4,9,16,25]
# zip() 接收任意多个(包括 0 个和 1个序列作为参数返回一个 tuple 列表。例x = [1,2,3] y = [4,5,6] z = [7,8,9] xyz = zip(x, y, z) ===> [(1,4,7), (2,5,8), (3,6,9)]
return self.activator(reduce(lambda a, b: a + b,map(lambda (x, w): x * w, zip(input_vec, self.weights)), 0.0) + self.bias)
def train(self, input_vecs, labels, iteration, rate):
'''
Desc:
输入训练数据:一组向量、与每个向量对应的 label; 以及训练轮数、学习率
Args:
input_vec —— 输入向量
labels —— 数据对应的标签
iteration —— 训练的迭代轮数
rate —— 学习率
Returns:
None
'''
for i in range(iteration):
self._one_iteration(input_vecs, labels, rate)
def _one_iteration(self, input_vecs, labels, rate):
'''
Desc:
训练过程的一次迭代过程
Args:
input_vecs —— 输入向量
labels —— 数据对应的标签
rate —— 学习率
Returns:
None
'''
# zip() 接收任意多个(包括 0 个和 1个序列作为参数返回一个 tuple 列表。例x = [1,2,3] y = [4,5,6] z = [7,8,9] xyz = zip(x, y, z) ===> [(1,4,7), (2,5,8), (3,6,9)]
samples = zip(input_vecs, labels)
# 对每个样本,按照感知器规则更新权重
for (input_vec, label) in samples:
# 计算感知器在当前权重下的输出
output = self.predict(input_vec)
# 更新权重
output = self._update_weights(input_vec, output, label, rate)
def _update_weights(self, input_vec, output, label, rate):
'''
Desc:
按照感知器规则更新权重
Args:
input_vec —— 输入向量
output —— 经过感知器规则计算得到的输出
label —— 输入向量对应的标签
rate —— 学习率
Returns:
None
'''
# 利用感知器规则更新权重
delta = label - output
# map() 接收一个函数 f 和一个 list ,并通过把函数 f 依次作用在 list 的每个元素上,得到一个新的 list 返回。比如我们的 f 函数是计算平方, map(f, [1,2,3,4,5]) ===> 返回 [1,4,9,16,25]
# zip() 接收任意多个(包括 0 个和 1个序列作为参数返回一个 tuple 列表。例x = [1,2,3] y = [4,5,6] z = [7,8,9] xyz = zip(x, y, z) ===> [(1,4,7), (2,5,8), (3,6,9)]
self.weights = map(lambda (x, w): w + rate * delta * x, zip(input_vec, self.weights))
# 更新 bias
self.bias += rate * delta
def f(x):
'''
Desc:
定义激活函数 f
Args:
x —— 输入向量
Returns:
(实现阶跃函数)大于 0 返回 1否则返回 0
'''
return 1 if x > 0 else 0
def get_training_dataset():
'''
Desc:
基于 and 真值表来构建/获取训练数据集
Args:
None
Returns:
input_vecs —— 输入向量
labels —— 输入向量对应的标签
'''
# 构建训练数据,输入向量的列表
input_vecs = [[1,1],[0,0],[1,0],[0,1]]
# 期望的输出列表,也就是上面的输入向量的列表中数据对应的标签,是一一对应的
labels = [1, 0, 0, 0]
return input_vecs, labels
def train_and_perceptron():
'''
Desc:
使用 and 真值表来训练我们的感知器
Args:
None
Returns:
p —— 返回训练好的感知器
'''
# 创建感知器,输入参数的个数是 2 个(因为 and 是个二元函数),激活函数为 f
p = Perceptron(2, f)
# 进行训练,迭代 10 轮,学习速率是我们设定的 rate ,为 0.1
input_vecs, labels = get_training_dataset()
p.train(input_vecs, labels, 10, 0.1)
# 返回训练好的感知器
return p
if __name__ == '__main__':
'''
Desc:
主函数,调用上面返回的训练好的感知器进行预测
Args:
None
Returns:
None
'''
# 训练 and 感知器
and_perceptron = train_and_perceptron()
# 打印训练获得的权重
print(and_perceptron)
# 测试
print('1 and 1 = %d' % and_perceptron.predict([1, 1]))
print('0 and 0 = %d' % and_perceptron.predict([0, 0]))
print('1 and 0 = %d' % and_perceptron.predict([1, 0]))
print('0 and 1 = %d' % and_perceptron.predict([0, 1]))

185
src/py2.x/dl/recursive.py Normal file
View File

@@ -0,0 +1,185 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import numpy as np
from activators import IdentityActivator
class TreeNode(object):
def __init__(self, data, children=[], children_data=[]):
self.parent = None
self.children = children
self.children_data = children_data
self.data = data
for child in children:
child.parent = self
# 递归神经网络实现
class RecursiveLayer(object):
def __init__(self, node_width, child_count,
activator, learning_rate):
'''
递归神经网络构造函数
node_width: 表示每个节点的向量的维度
child_count: 每个父节点有几个子节点
activator: 激活函数对象
learning_rate: 梯度下降算法学习率
'''
self.node_width = node_width
self.child_count = child_count
self.activator = activator
self.learning_rate = learning_rate
# 权重数组W
self.W = np.random.uniform(-1e-4, 1e-4,
(node_width, node_width * child_count))
# 偏置项b
self.b = np.zeros((node_width, 1))
# 递归神经网络生成的树的根节点
self.root = None
def forward(self, *children):
'''
前向计算
'''
children_data = self.concatenate(children)
parent_data = self.activator.forward(
np.dot(self.W, children_data) + self.b
)
self.root = TreeNode(parent_data, children
, children_data)
def backward(self, parent_delta):
'''
BPTS反向传播算法
'''
self.calc_delta(parent_delta, self.root)
self.W_grad, self.b_grad = self.calc_gradient(self.root)
def update(self):
'''
使用SGD算法更新权重
'''
self.W -= self.learning_rate * self.W_grad
self.b -= self.learning_rate * self.b_grad
def reset_state(self):
self.root = None
def concatenate(self, tree_nodes):
'''
将各个树节点中的数据拼接成一个长向量
'''
concat = np.zeros((0,1))
for node in tree_nodes:
concat = np.concatenate((concat, node.data))
return concat
def calc_delta(self, parent_delta, parent):
'''
计算每个节点的delta
'''
parent.delta = parent_delta
if parent.children:
# 根据式2计算每个子节点的delta
children_delta = np.dot(self.W.T, parent_delta) * (
self.activator.backward(parent.children_data)
)
# slices = [(子节点编号子节点delta起始位置子节点delta结束位置)]
slices = [(i, i * self.node_width,
(i + 1) * self.node_width)
for i in range(self.child_count)]
# 针对每个子节点递归调用calc_delta函数
for s in slices:
self.calc_delta(children_delta[s[1]:s[2]],
parent.children[s[0]])
def calc_gradient(self, parent):
'''
计算每个节点权重的梯度,并将它们求和,得到最终的梯度
'''
W_grad = np.zeros((self.node_width,
self.node_width * self.child_count))
b_grad = np.zeros((self.node_width, 1))
if not parent.children:
return W_grad, b_grad
parent.W_grad = np.dot(parent.delta, parent.children_data.T)
parent.b_grad = parent.delta
W_grad += parent.W_grad
b_grad += parent.b_grad
for child in parent.children:
W, b = self.calc_gradient(child)
W_grad += W
b_grad += b
return W_grad, b_grad
def dump(self, **kwArgs):
print('root.data: %s' % self.root.data)
print('root.children_data: %s' % self.root.children_data)
if kwArgs.has_key('dump_grad'):
print('W_grad: %s' % self.W_grad)
print('b_grad: %s' % self.b_grad)
def data_set():
children = [
TreeNode(np.array([[1],[2]])),
TreeNode(np.array([[3],[4]])),
TreeNode(np.array([[5],[6]]))
]
d = np.array([[0.5],[0.8]])
return children, d
def gradient_check():
'''
梯度检查
'''
# 设计一个误差函数,取所有节点输出项之和
error_function = lambda o: o.sum()
rnn = RecursiveLayer(2, 2, IdentityActivator(), 1e-3)
# 计算forward值
x, d = data_set()
rnn.forward(x[0], x[1])
rnn.forward(rnn.root, x[2])
# 求取sensitivity map
sensitivity_array = np.ones((rnn.node_width, 1),
dtype=np.float64)
# 计算梯度
rnn.backward(sensitivity_array)
# 检查梯度
epsilon = 10e-4
for i in range(rnn.W.shape[0]):
for j in range(rnn.W.shape[1]):
rnn.W[i,j] += epsilon
rnn.reset_state()
rnn.forward(x[0], x[1])
rnn.forward(rnn.root, x[2])
err1 = error_function(rnn.root.data)
rnn.W[i,j] -= 2*epsilon
rnn.reset_state()
rnn.forward(x[0], x[1])
rnn.forward(rnn.root, x[2])
err2 = error_function(rnn.root.data)
expect_grad = (err1 - err2) / (2 * epsilon)
rnn.W[i,j] += epsilon
print('weights(%d,%d): expected - actural %.4e - %.4e' % (
i, j, expect_grad, rnn.W_grad[i,j]))
return rnn
def test():
children, d = data_set()
rnn = RecursiveLayer(2, 2, IdentityActivator(), 1e-3)
rnn.forward(children[0], children[1])
rnn.dump()
rnn.forward(rnn.root, children[2])
rnn.dump()
rnn.backward(d)
rnn.dump(dump_grad='true')
return rnn

161
src/py2.x/dl/rnn.py Normal file
View File

@@ -0,0 +1,161 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import numpy as np
from cnn import element_wise_op
from activators import ReluActivator, IdentityActivator
try:
reduce # Python 2
except NameError: # Python 3
from functools import reduce
class RecurrentLayer(object):
'''
Desc:
用 RecurrentLayer 类来实现一个循环层。下面的代码是初始化一个循环层可以在构造函数中设置卷积层的超参数。我们注意到循环层有两个权重数组U和W
'''
def __init__(self, input_width, state_width,
activator, learning_rate):
self.input_width = input_width
self.state_width = state_width
self.activator = activator
self.learning_rate = learning_rate
self.times = 0 # 当前时刻初始化为t0
self.state_list = [] # 保存各个时刻的state
self.state_list.append(np.zeros(
(state_width, 1))) # 初始化s0
self.U = np.random.uniform(-1e-4, 1e-4,
(state_width, input_width)) # 初始化U
self.W = np.random.uniform(-1e-4, 1e-4,
(state_width, state_width)) # 初始化W
def forward(self, input_array):
'''
Desc:
实现循环层的前向计算
'''
self.times += 1
state = (np.dot(self.U, input_array) +
np.dot(self.W, self.state_list[-1]))
element_wise_op(state, self.activator.forward)
self.state_list.append(state)
def backward(self, sensitivity_array,
activator):
'''
实现BPTT算法
'''
self.calc_delta(sensitivity_array, activator)
self.calc_gradient()
def update(self):
'''
按照梯度下降,更新权重
'''
self.W -= self.learning_rate * self.gradient
def calc_delta(self, sensitivity_array, activator):
self.delta_list = [] # 用来保存各个时刻的误差项
for i in range(self.times):
self.delta_list.append(np.zeros(
(self.state_width, 1)))
self.delta_list.append(sensitivity_array)
# 迭代计算每个时刻的误差项
for k in range(self.times - 1, 0, -1):
self.calc_delta_k(k, activator)
def calc_delta_k(self, k, activator):
'''
根据k+1时刻的delta计算k时刻的delta
'''
state = self.state_list[k+1].copy()
element_wise_op(self.state_list[k+1],
activator.backward)
self.delta_list[k] = np.dot(
np.dot(self.delta_list[k+1].T, self.W),
np.diag(state[:,0])).T
def calc_gradient(self):
self.gradient_list = [] # 保存各个时刻的权重梯度
for t in range(self.times + 1):
self.gradient_list.append(np.zeros(
(self.state_width, self.state_width)))
for t in range(self.times, 0, -1):
self.calc_gradient_t(t)
# 实际的梯度是各个时刻梯度之和
self.gradient = reduce(
lambda a, b: a + b, self.gradient_list,
self.gradient_list[0]) # [0]被初始化为0且没有被修改过
def calc_gradient_t(self, t):
'''
计算每个时刻t权重的梯度
'''
gradient = np.dot(self.delta_list[t],
self.state_list[t-1].T)
self.gradient_list[t] = gradient
def reset_state(self):
self.times = 0 # 当前时刻初始化为t0
self.state_list = [] # 保存各个时刻的state
self.state_list.append(np.zeros(
(self.state_width, 1))) # 初始化s0
def data_set():
x = [np.array([[1], [2], [3]]),
np.array([[2], [3], [4]])]
d = np.array([[1], [2]])
return x, d
def gradient_check():
'''
梯度检查
'''
# 设计一个误差函数,取所有节点输出项之和
error_function = lambda o: o.sum()
rl = RecurrentLayer(3, 2, IdentityActivator(), 1e-3)
# 计算forward值
x, d = data_set()
rl.forward(x[0])
rl.forward(x[1])
# 求取sensitivity map
sensitivity_array = np.ones(rl.state_list[-1].shape,
dtype=np.float64)
# 计算梯度
rl.backward(sensitivity_array, IdentityActivator())
# 检查梯度
epsilon = 10e-4
for i in range(rl.W.shape[0]):
for j in range(rl.W.shape[1]):
rl.W[i,j] += epsilon
rl.reset_state()
rl.forward(x[0])
rl.forward(x[1])
err1 = error_function(rl.state_list[-1])
rl.W[i,j] -= 2*epsilon
rl.reset_state()
rl.forward(x[0])
rl.forward(x[1])
err2 = error_function(rl.state_list[-1])
expect_grad = (err1 - err2) / (2 * epsilon)
rl.W[i,j] += epsilon
print('weights(%d,%d): expected - actural %f - %f' % (
i, j, expect_grad, rl.gradient[i,j]))
def test():
l = RecurrentLayer(3, 2, ReluActivator(), 1e-3)
x, d = data_set()
l.forward(x[0])
l.forward(x[1])
l.backward(d, ReluActivator())
return l