Press "Enter" to skip to content

运用计算图搭建卷积神经网络(CNN)

本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.

 

文章作者:张觉非 360

 

编辑整理:Hoh Xil

 

内容来源:作者授权

 

出品社区:DataFun

 

注:欢迎转载,转载请注明出处

 

“您否认这座由花与矿物构成的永恒的药房,专为治疗被称为人类的永恒的患者?!” “我既不否认药房,也不否认患者。 我否认的是医生。” ——《巴黎圣母院》

 

干嘛引这幺两句?众明公自行体味。由于加入了矩阵计算的算子,我们的计算图框架(  VectorSlow  )现在该改叫  MatrixSlow  了。 也许哪一天我们会将它改进成  TensorSlow  ,但总之  Slow  这个特性我们是不会放弃的。

 

在之前的文章中,我们介绍了计算图和自动求导的原理及实现( 这里 ),用  MatrixSlow  搭建了一些可以被纳入“非全连接神经网络”范畴的若干模型( 这里 ),还搭建了一些深度不一的多层全连接神经网络,并用动画展示了它们的训练过程( 这里 )。 现在,我们用  MatrixSlow  搭建卷积神经网络(CNN)并用来识别 MNIST 。关于 CNN 的介绍,可见 这里:

 

https://zhuanlan.zhihu.com/p/25249694

 

作者对本文代码保留后续修改的权利,完整代码请见码云:

 

https://gitee.com/zhangjuefei/computing_graph_demo

 

▌ 1. 卷积

 

我们首先实现卷积算子,代码如下( node.py ):

 

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/node.py

 

class Convolve(Node):
    """
    以第二个父节点的值为卷积核,对第一个父节点的值做二维离散卷积
    """
    def __init__(self, *parents):
        assert len(parents) == 2
        Node.__init__(self, *parents)
        self.padded = None
    def compute(self):
        data = self.parents[0].value  # 输入特征图
        kernel = self.parents[1].value  # 卷积核
        w, h = data.shape  # 输入特征图的宽和高
        kw, kh = kernel.shape  # 卷积核尺寸
        hkw, hkh = int(kw / 2), int(kh / 2)  # 卷积核长宽的一半
        # 补齐数据边缘
        pw, ph = tuple(np.add(data.shape, np.multiply((hkw, hkh), 2)))
        self.padded = np.mat(np.zeros((pw, ph)))
        self.padded[hkw:hkw + w, hkh:hkh + h] = data
        self.value = np.mat(np.zeros((w, h)))
        # 二维离散卷积
        for i in np.arange(hkw, hkw + w):
            for j in np.arange(hkh, hkh + h):
                self.value[i - hkw, j - hkh] = np.sum(
                    np.multiply(self.padded[i - hkw:i - hkw + kw, j - hkh:j - hkh + kh], kernel))
    def get_jacobi(self, parent):
        data = self.parents[0].value  # 输入特征图
        kernel = self.parents[1].value  # 卷积核
        w, h = data.shape  # 输入特征图的宽和高
        kw, kh = kernel.shape  # 卷积核尺寸
        hkw, hkh = int(kw / 2), int(kh / 2)  # 卷积核长宽的一半
        # 补齐数据边缘
        pw, ph = tuple(np.add(data.shape, np.multiply((hkw, hkh), 2)))
        jacobi = []
        mask = np.mat(np.zeros((pw, ph)))
        if parent is self.parents[0]:
            for i in np.arange(hkw, hkw + w):
                for j in np.arange(hkh, hkh + h):
                    mask *= 0
                    mask[i - hkw:i - hkw + kw, j - hkh:j - hkh + kh] = kernel
                    jacobi.append(mask[hkw:hkw+w, hkh:hkh+h].A1)
        elif parent is self.parents[1]:
            for i in np.arange(hkw, hkw + w):
                for j in np.arange(hkh, hkh + h):
                    jacobi.append(self.padded[i - hkw:i - hkw + kw, j - hkh:j - hkh + kh].A1)
        else:
            raise Exception("You're not my father")
        return np.mat(jacobi)

 

Convolve 接受两个父节点:图像(或叫特征图)节点,它是一个二维矩阵;卷积核节点也是一个二维矩阵。Convolve 暂时不支持设置步幅(stride)和填充方式(padding),步幅一律为 1 ,使用补零填充。compute 以第二个父节点的值为滤波器,对第一个父节点的值做二维离散卷积(滤波)。get_jacobi 函数返回当前本节点对特征图或卷积核的雅克比矩阵。

 

▌ 2. 最大值池化

 

我们来实现最大值池化算子,代码如下( node.py ):

 

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/node.py

 

class MaxPooling(Node):
    """
    最大值池化
    """
    def __init__(self, parent, size, stride):
        Node.__init__(self, parent)
        assert isinstance(stride, tuple) and len(stride) == 2
        self.stride = stride
        assert isinstance(size, tuple) and len(size) == 2
        self.size = size
        self.flag = None
    def compute(self):
        data = self.parents[0].value  # 输入特征图
        w, h = data.shape  # 输入特征图的宽和高
        dim = w * h
        sw, sh = self.stride
        kw, kh = self.size  # 池化核尺寸
        hkw, hkh = int(kw / 2), int(kh / 2)  # 池化核长宽的一半
        result = []
        flag = []
        for i in np.arange(0, w, sw):
            row = []
            for j in np.arange(0, h, sh):
                # 取池化窗口中的最大值
                top, bottom = max(0, i - hkw), min(w, i + hkw + 1)
                left, right = max(0, j - hkh), min(h, j + hkh + 1)
                window = data[top:bottom, left:right]
                row.append(
                    np.max(window)
                )
                # 记录最大值在原特征图中的位置
                pos = np.argmax(window)
                w_width = right - left
                offset_w, offset_h = top + pos // w_width, left + pos % w_width
                offset = offset_w * w + offset_h
                tmp = np.zeros(dim)
                tmp[offset] = 1
                flag.append(tmp)
            result.append(row)
        self.flag = np.mat(flag)
        self.value = np.mat(result)
    def get_jacobi(self, parent):
        assert parent is self.parents[0] and self.jacobi is not None
        return self.flag

 

MaxPooling 节点接受 size 参数指定池化窗口(池化核)大小,它是一个包含宽和高的 tuple 。MaxPooling 节点还接受 stride 参数,它也是一个 tuple ,指定两个方向的步幅。compute 方法移动池化窗口,取窗口中的最大值,同时以标志位的形式记录下被取的最大值在原特征图点的位置,get_jacobi 就以这个标志位作为雅可比,具体见代码。

 

▌3.  辅助性节点

 

我们需要一个 reshape 算子改变数据的形状(将二维特征图转成一维向量,连接全连接层),代码如下( node.py ):

 

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/node.py

 

class Reshape(Node):
    """
    改变父节点的值(矩阵)的形状
    """
    def __init__(self, parent, shape):
        Node.__init__(self, parent)
        assert isinstance(shape, tuple) and len(shape) == 2
        self.to_shape = shape
    def compute(self):
        self.value = self.parents[0].value.reshape(self.to_shape)
    def get_jacobi(self, parent):
        assert parent is self.parents[0]
        return np.mat(np.eye(self.dimension()))

 

卷积部分结束时,数据的形状是一组多个特征图。这时若要连接全连接层,需要将这些特征图展平并连接成一个向量。Flatten 节点肩负这个任务,代码如下( node.py ):

 

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/node.py

 

class Flatten(Node):
    """
    将多个父节点的值连接成向量
    """
    def compute(self):
        assert len(self.parents) > 0
        # 将所有负矩阵展平并连接成一个向量
        self.value = np.concatenate(
            [p.value.flatten() for p in self.parents],
            axis=1
        ).T
    def get_jacobi(self, parent):
        assert parent in self.parents
        dimensions = [p.dimension() for p in self.parents]  # 各个父节点的元素数量
        pos = self.parents.index(parent)  # 当前是第几个父节点
        dimension = parent.dimension()  # 当前父节点的元素数量
        assert dimension == dimensions[pos]
        jacobi = np.mat(np.zeros((self.dimension(), dimension)))
        start_row = int(np.sum(dimensions[:pos]))
        jacobi[start_row:start_row + dimension, 0:dimension] = np.eye(dimension)
        return jacobi

 

▌4.  构造“层”的辅助函数

 

我们写几个构造 CNN 的“层”(layer)的函数,以方便网络的构建,代码如下( layer.py ):

 

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/layer.py

 

from node import *

def conv(feature_maps, input_shape, kernels, kernel_shape, activation):
    """
    :param feature_maps: 数组,包含多个输入特征图,它们应该是值为同形状的矩阵的节点
    :param input_shape: tuple ,包含输入特征图的形状(宽和高)
    :param kernels: 整数,卷积层的卷积核数量
    :param kernel_shape: tuple ,卷积核的形状(宽和高)
    :param activation: 激活函数类型
    :return: 数组,包含多个输出特征图,它们是值为同形状的矩阵的节点
    """
    outputs = []
    for i in range(kernels):
        channels = []
        for  in feature_maps:
            kernel = Variable(kernel_shape, init=True, trainable=True)
            conv = Convolve(fm, kernel)
            channels.append(conv)
        channles = Add(*channels)
        bias = Variable(input_shape, init=True, trainable=True)
        affine = Add(channles, bias)
        if activation == "ReLU":
            outputs.append(ReLU(affine))
        elif activation == "Logistic":
            outputs.append(Logistic(affine))
        else:
            outputs.append(affine)
    assert len(outputs) == kernels
    return outputs

 

conv 函数接受保存多个输入特征图(或者称作输入图像的多个通道)的数组、输入特征图的尺寸、卷积核数、卷积核尺寸、激活函数种类(”ReLU” 或 “Logistic” 以及未来会有的其他种类)。conv 的返回值也是一个数组,包含卷积层的多个输出特征图(通道),该数量与卷积核数量一致。

 

def pooling(feature_maps, kernel_shape, stride):
    """
    :param feature_maps: 数组,包含多个输入特征图,它们应该是值为同形状的矩阵的节点
    :param kernel_shape: tuple ,池化核(窗口)的形状(宽和高)
    :param stride: tuple ,包含横向和纵向步幅
    :return: 数组,包含多个输出特征图,它们是值为同形状的矩阵的节点
    """
    outputs = []
    for fm in feature_maps:
        outputs.append(MaxPooling(fm, size=kernel_shape, stride=stride))
    return outputs

 

pooling 函数构造一个池化层(目前只支持最大值池化)。该函数接受保存多个输入特征图的数组、池化核(即池化窗口)尺寸、横向和纵向的步幅。pooling 函数返回一个数组,包含多个输出特征图。

 

def fc(input, input_size, size, activation):
    """
    :param input: 输入向量
    :param input_size: 输入向量的维度
    :param size: 神经元个数,即输出个数(输出向量的维度)
    :param activation: 激活函数类型
    :return: 输出向量
    """
    weights = Variable((size, input_size), init=True, trainable=True)
    bias = Variable((size, 1), init=True, trainable=True)
    affine = Add(MatMul(weights, input), bias)
    if activation == "ReLU":
        return ReLU(affine)
    elif activation == "Logistic":
        return Logistic(affine)
    else:
        return affine

 

fc 函数构造全连接层,接受输入数量(输入向量的维数),神经元个数(输出向量的维数)以及激活函数的种类。

 

▌5 . 构造卷积神经网络

 

现在我们就可以搭建一个简单的卷积神经网络了,代码如下( test_cnn.py ):

 

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/test_cnn.py

 

from sklearn.metrics import accuracy_score
from node import *
from util import mnist
from optimizer import *
from layer import *
print(__file__)
train_x, train_y, test_x, test_y = mnist('./data/MNIST')
test_x = test_x[:1000]
test_y = test_y[:1000]
# train_x = train_x[:10000]
# train_y = train_y[:10000]
img_shape = (28, 28)
img = Variable(img_shape, init=False, trainable=False)  # 占位符,28x28 的图像
conv1 = conv([img], img_shape, 6, (3, 3), "ReLU")  # 第一卷积层
pooling1 = pooling(conv1, (3, 3), (2, 2))  # 第一池化层
conv2 = conv(pooling1, (14, 14), 6, (3, 3), "ReLU")  # 第二卷积层
pooling2 = pooling(conv2, (3, 3), (2, 2))  # 第二池化层
fc1 = fc(Flatten(*pooling2), 294, 100, "ReLU")  # 第一全连接层
logits = fc(fc1, 100, 10, "None")  # 第二全连接层,无激活函数
# 分类概率
prob = SoftMax(logits)
# 训练标签
label = Variable((10, 1), trainable=False)
# 交叉熵损失
loss = CrossEntropyWithSoftMax(logits, label)
# Adam 
optimizer = Adam(default_graph, loss, 0.005, batch_size=64)

# 训练
print("start training")
for e in range(10):
    for i in range(len(train_x)):
        img.set_value(np.mat(train_x[i, :]).reshape(28, 28))
        label.set_value(np.mat(train_y[i, :]).T)
        # 执行一步优化
        optimizer.one_step()
        # 显示进度
        loss.forward()
        percent = int((i + 1) / len(train_x) * 100)
        print("".join(["="] * percent) + "> loss:{:.3f} {:d}({:.0f}%)".format(loss.value[0, 0], i + 1, percent))
        if i > 1 and (i + 1) % 5000 == 0:
            # 在测试集上评估模型正确率
            probs = []
            losses = []
            for j in range(len(test_x)):
                img.set_value(np.mat(test_x[j, :]).reshape(28, 28))
                label.set_value(np.mat(test_y[j, :]).T)
                # 前向传播计算概率
                prob.forward()
                probs.append(prob.value.A1)
                # 计算损失值
                loss.forward()
                losses.append(loss.value[0, 0])
                # print("test instance: {:d}".format(j))
            # 取概率最大的类别为预测类别
            pred = np.argmax(np.array(probs), axis=1)
            truth = np.argmax(test_y, axis=1)
            accuracy = accuracy_score(truth, pred)
            default_graph.draw()
            print("epoch: {:d}, iter: {:d}, loss: {:.3f}, accuracy: {:.2f}%".format(e + 1, i + 1, np.mean(losses), accuracy * 100))

 

这个卷积神经网络包含两个卷积层,第一个卷积层有 6 个卷积核,第 2 个卷积层也有 6 个卷积核。卷积核的尺寸都是 3X3 。每个卷积层之后是一个 stride 为 2 的最大值池化层,它们将特征图尺寸缩小一半。经过第二个最大值池化层后,特征图尺寸成为  7X7=49 。之后用 Flatten 算子将 6 个特征图展平成 294 维向量,后接全连接层。第一个全连接层的输出是 100 维向量,第二个全连接层输出 10 维向量,作为十分类的 logits 。接下来是损失值部分,然后是训练主循环。我们构造的 CNN 的计算图如下:

 

 

我们构造的 CNN

 

▌6.  训练 Sobel 滤波器

 

在两年前的博文《 卷积神经网络简介 》中,我们从可训练滤波器的角度引入 CNN 。 那时我们举了一个例子:训练 Sobel 。我们用 (纵向)Sobel 算子对莱娜图做滤波,效果是强化原图中的纵向边缘。之后,以原图为输入,以 Sobel 算子滤出来的图为 target ,构造一个计算图,对输入施加  3X3   的滤波,输出与原图同尺寸的图像,以输出图像与 target 图像所有像素的误差平方和作为损失,将计算图的随机卷积核训练成 Sobel 滤波器,我们看代码( test_sobel.py ):

 

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/lost%20&%20found/test_sobel.py

 

import skimage, os
import numpy as np
from node import *
from optimizer import *
from scipy.ndimage.filters import convolve
import matplotlib.pyplot as plt
# Sobel 滤波器
filter = np.mat([
        [1.,  0.,   -1.],
        [2.,  0.,   -2.],
        [1.,  0.,   -1.]])
# 图像尺寸
w, h = 128, 128
# 搭建计算图
input_img = Variable((w, h), init=False, trainable=False)  # 输入图像占位变量
target_img = Variable((w, h), init=False, trainable=False)  # target 图像占位变量
kernel = Variable((3, 3), init=True, trainable=True)  # 被训练的卷积核
# 对输入图像施加滤波
conv_img = Convolve(input_img, kernel)
# 将输入图像和 target 图像展成向量
target_img_flat = Reshape(target_img, shape=(w * h, 1))
conv_img_flat = Reshape(conv_img, shape=(w * h, 1))
# 常数(矩阵)[[-1]]
minus = Variable((1, 1), init=False, trainable=False)
minus.set_value(np.mat(-1))
# 常数(矩阵):图像总像素数的倒数
n = Variable((1, 1), init=False, trainable=False)
n.set_value(np.mat(1.0 / (w * h)))
# 损失值:输出图像与 target 图像的平均像素平方误差
loss = MatMul(Dot(
        Add(target_img_flat, MatMul(conv_img_flat, minus)), 
        Add(target_img_flat, MatMul(conv_img_flat, minus))
    ), n)
# RMSProp 优化器
optimizer = Adam(default_graph, loss, 0.06, batch_size=1)
# 读取 lena 图,将 rgb 图像转成单通道灰度图,并 resize 成指定大小
img = skimage.transform.resize(
                    skimage.color.rgb2gray(
                            skimage.io.imread("lena.png")
                            ),
                    (w, h)
                    )
# 制作目标图像:对原图像施加 Sobel 滤波器
sobel = np.mat([[1,0,-1],[2,0,-2],[1,0,-1]])
target =  np.zeros((w, h))
convolve(input=img, output=target, weights=filter, mode="constant", cval=0.0)
# 保存原图和经过 Sobel 滤波的图像
skimage.io.imsave("origin.png", np.minimum(np.maximum(img, 0.0), 1.0))
skimage.io.imsave("target.png", np.minimum(np.maximum(target, 0.0), 1.0))
# 以原图为输入,以经过 Sobel 滤波的图像为 target
input_img.set_value(np.mat(img))
target_img.set_value(np.mat(target)) 
i = 0
for e in range(1000):
    
    # 一次迭代
    optimizer.one_step()                
    
    # 计算损失值
    loss.forward()
    print("pic:{:s},loss:{:.6f}".format(p, loss.value[0, 0]))
    
    # 保存当前卷积核对输入图像做滤波的结果
    fname = "{:d}.png".format(i)
    if os.path.exists(fname):
        os.remove(fname)
    skimage.io.imsave(fname, np.minimum(np.maximum(conv_img.value, 0.0), 1.0))
    i += 1

 

迭代进行到 302 次时,损失值已经不怎幺下降了,我们手动终止了进程。看看每次迭代输出的图像与 target 图像:

 

 

输出图像与 target 图像的比较

 

可见,输出图像已经很接近 Sobel 滤波的 target 图像了。这时,被训练的 kernel 节点的值是:

 

 

被训练的卷积核节点(kernel)的值

 

卷积核已经被训练得比较接近 Sobel 算子了。

 

作者介绍

 

张觉非,本科毕业于复旦大学,硕士毕业于中国科学院大学,先后任职于新浪微博、阿里,目前就职于奇虎360,任机器学习技术专家。

 

Be First to Comment

发表评论

邮箱地址不会被公开。 必填项已用*标注