本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.
简介
手把手教你如何自己设计实现一个深度学习框架(附代码实现) 对机器学习在工程上的实现和抽象说的比较透。 tinynn 只是一个「玩具」版本的深度学习框架,一个成熟的深度学习框架至少还需要:支持自动求导、高运算效率(静态语言加速、支持 GPU 加速)、提供丰富的算法实现、提供易用的接口和详细的文档等等。
抽象层
组件抽象
神经网络运算主要包含训练 training 和预测 predict (或 inference) 两个阶段,训练的基本流程是:输入数据 -> 网络层前向传播 -> 计算损失 -> 网络层反向传播梯度 -> 更新参数,预测的基本流程是 输入数据 -> 网络层前向传播 -> 输出结果。从运算的角度看,主要可以分为三种类型的计算:
- 数据在网络层之间的流动:前向传播和反向传播可以看做是张量 Tensor(多维数组)在网络层之间的流动(前向传播流动的是输入输出,反向传播流动的是梯度),每个网络层会进行一定的运算,然后将结果输入给下一层
- 计算损失:衔接前向和反向传播的中间过程,定义了模型的输出与真实值之间的差异,用来后续提供反向传播所需的信息
- 参数更新:使用计算得到的梯度对网络参数进行更新的一类计算
基于这个三种类型,我们可以对网络的基本组件做一个抽象
- tensor 张量,这个是神经网络中数据的基本单位
- layer 网络层,负责接收上一层的输入,进行该层的运算,将结果输出给下一层,由于 tensor 的流动有前向和反向两个方向,因此对于每种类型网络层我们都需要同时实现 forward 和 backward 两种运算
- loss 损失,在给定模型预测值与真实值之后,该组件输出损失值以及关于最后一层的梯度(用于梯度回传)
optimizer 优化器,负责使用梯度更新模型的参数 然后我们还需要一些组件把上面这个 4 种基本组件整合到一起,形成一个 pipeline
net 组件负责管理 tensor 在 layers 之间的前向和反向传播,同时能提供获取参数、设置参数、获取梯度的接口
model 组件负责整合所有组件,形成整个 pipeline。即 net 组件进行前向传播 -> losses 组件计算损失和梯度 -> net 组件将梯度反向传播 -> optimizer 组件将梯度更新到参数。
基本的框架图如下图
组件实现
按照上面的抽象,我们可以写出整个流程代码如下。PS:一个架构设计的典型案例
# define model net = Net([layer1, layer2, ...]) model = Model(net, loss_fn, optimizer) # training,将 net、loss、optimizer 一起传给 model,model 实现了 forward、backward 和 apply_grad 三个接口分别对应前向传播、反向传播和参数更新三个功能 pred = model.forward(train_X) loss, grads = model.backward(pred, train_Y) model.apply_grad(grads) # inference test_pred = model.forward(test_X)
tensor 张量是神经网络中基本的数据单位,我们这里直接使用 numpy.ndarray 类作为 tensor 类的实现。 layer需要有提供 forward 和 backward 接口进行对应的运算。同时还应该将该层的参数和梯度记录下来。先实现一个基类如下
class Layer(object): def __init__(self, name): self.name = name self.params, self.grads = None, None def forward(self, inputs): raise NotImplementedError def backward(self, grad): raise NotImplementedError
最基础的一种网络层是全连接网络层
class Dense(Layer): def __init__(self, num_in, num_out,w_init=XavierUniformInit(),b_init=ZerosInit()): super().__init__("Linear") self.params = { "w": w_init([num_in, num_out]), "b": b_init([1, num_out])} self.inputs = None # forward 方法接收上层的输入 inputs,实现 的运算 def forward(self, inputs): self.inputs = inputs return inputs @ self.params["w"] + self.params["b"] # backward 的方法接收来自上层的梯度,计算关于参数 和输入的梯度,然后返回关于输入的梯度 def backward(self, grad): self.grads["w"] = self.inputs.T @ grad self.grads["b"] = np.sum(grad, axis=0) return grad @ self.params["w"].T
激活函数可以看做是一种网络层
class Activation(Layer): """Base activation layer""" def __init__(self, name): super().__init__(name) self.inputs = None def forward(self, inputs): self.inputs = inputs return self.func(inputs) def backward(self, grad): return self.derivative_func(self.inputs) * grad def func(self, x): raise NotImplementedError def derivative_func(self, x): raise NotImplementedError
net 类负责管理 tensor 在 layers 之间的前向和反向传播
class Net(object): def __init__(self, layers): self.layers = layers # 按顺序遍历所有层,每层计算的输出作为下一层的输入 def forward(self, inputs): for layer in self.layers: inputs = layer.forward(inputs) return inputs # 逆序遍历所有层,将每层的梯度作为下一层的输入 def backward(self, grad): all_grads = [] # 将每个网络层参数的梯度保存下来返回,后面参数更新需要用到 for layer in reversed(self.layers): grad = layer.backward(grad) all_grads.append(layer.grads) return all_grads[::-1] def get_params_and_grads(self): for layer in self.layers: yield layer.params, layer.grads def get_parameters(self): return [layer.params for layer in self.layers] def set_parameters(self, params): for i, layer in enumerate(self.layers): for key in layer.params.keys(): layer.params[key] = params[i][key]
losses 组件需要做两件事情
class BaseLoss(object): # 计算损失值 def loss(self, predicted, actual): raise NotImplementedError # 计算损失值和关于预测值的梯度 def grad(self, predicted, actual): raise NotImplementedError
optimizer 主要实现一个接口 compute_step,这个方法根据当前的梯度,计算返回实际优化时每个参数改变的步长。
class BaseOptimizer(object): def __init__(self, lr, weight_decay): self.lr = lr self.weight_decay = weight_decay def compute_step(self, grads, params): step = list() # flatten all gradients flatten_grads = np.concatenate([np.ravel(v) for grad in grads for v in grad.values()]) # compute step flatten_step = self._compute_step(flatten_grads) # reshape gradients p = 0 for param in params: layer = dict() for k, v in param.items(): block = np.prod(v.shape) _step = flatten_step[p:p+block].reshape(v.shape) _step -= self.weight_decay * v layer[k] = _step p += block step.append(layer) return step def _compute_step(self, grad): raise NotImplementedError
model 类实现了我们一开始设计的三个接口 forward、backward 和 apply_grad
class Model(object): def __init__(self, net, loss, optimizer): self.net = net self.loss = loss self.optimizer = optimizer def forward(self, inputs): return self.net.forward(inputs) def backward(self, preds, targets): loss = self.loss.loss(preds, targets) grad = self.loss.grad(preds, targets) grads = self.net.backward(grad) params = self.net.get_parameters() step = self.optimizer.compute_step(grads, params) return loss, step def apply_grad(self, grads): for grad, (param, _) in zip(grads, self.net.get_params_and_grads()): for k, v in param.items(): param[k] += grad[k]
执行层
上面的抽象组件这幺热闹,到真正的实现就又是另一幅天地了,可以好好品味 上层model 抽象与底层数据流图的gap,layer1 ==> layer2 ==> …layern 被 展开 成了 op,tenor 在layer 之间的流动 转换为了 dag op 间的流动。 深度学习分布式训练的现状及未来 AI 模型训练任务流程:初始化模型参数 -> 逐条读取训练样本 -> 前向、反向、参数更新 -> 读取下一条样本 -> 前向、反向、参数更新 -> … 循环,直至收敛。在软件层面的体现就是计算机按顺序运行一个个 OP。
几乎所有的 AI 框架都有 OP 的概念,简单来说就是一个函数,完成某个具体的功能,比如说加法、矩阵乘法、卷积等。为什幺要多此一举引入这样一个概念呢?这其实是给每个具体计算功能抽象出一个统一接口,在静态图场景下能实现函数的编排(OP 的自由组合)。
极简demo
动手学深度学习框架(4)- 手把手教你写一个功能完整的简易 Demo
#include <iostream> #include <iomanip> #include <string> #include <unordered_map> #include <random> #include <chrono> #include <any> //自定义 Tensor 类型,这里数据成员非常简单,就是个标量,重载了基本数学运算符 class MyTensor { public: uint32_t data; public: MyTensor(){}; MyTensor(uint32_t x) : data(x) {} MyTensor operator*(const MyTensor& a) { this->data = this->data * a.data; return *this; } MyTensor operator+(const MyTensor& a) { this->data = this->data + a.data; return *this; } MyTensor operator-(const MyTensor& a) { this->data = this->data - a.data; return *this; } MyTensor operator*(const int& a) { this->data = this->data * a; return *this; } }; // Op 基类 class OpBase { public: std::unordered_map<std::string, MyTensor> inputs; std::unordered_map<std::string, MyTensor> outputs; std::unordered_map<std::string, MyTensor> labels; public: virtual void Run() = 0; }; // 乘法前向 Op class MultipylyForward : public OpBase { public: void Run() { MyTensor x = inputs["X"]; MyTensor w = inputs["W"]; MyTensor y1 = x * w; outputs["Y"] = y1; } }; // 乘法反向 Op class MultipylyBackward : public OpBase { public: void Run() { MyTensor x = inputs["X"]; outputs["Y"] = x; } }; // 加法前向 Op class AddForward : public OpBase { public: void Run() { MyTensor x1 = inputs["X1"]; MyTensor x2 = inputs["X2"]; MyTensor y = x1 + x2; outputs["Y"] = y; } }; // 加法反向 Op class AddBackward : public OpBase { public: void Run() { MyTensor x; x.data = 1; outputs["Y"] = x; } }; // loss 前向 Op,这里选取 MSE 作为示例 class LossForward : public OpBase { public: void Run() { MyTensor y = inputs["X"]; MyTensor label = labels["Label"]; MyTensor loss = (y - label) * (y - label); outputs["Y"] = loss; } }; // loss 反向 Op class LossBackward : public OpBase { public: void Run() { MyTensor y = inputs["X"]; MyTensor label = labels["Label"]; outputs["Y"] = (y - label) + (y - label); } }; // 梯度更新 Op class UpdateGrad : public OpBase { public: double lr = 0.1; std::unordered_map<std::string, MyTensor> inputs; std::unordered_map<std::string, MyTensor> outputs; public: void Run() { MyTensor w = inputs["W"]; MyTensor grad = inputs["Grad1"] * inputs["Grad2"] * inputs["Grad3"]; // 链式求导 MyTensor lr; lr.data = this->lr; outputs["Y"] = w - lr * grad; } }; int main() { //1. 用户自定义前向组网 std::vector<std::string> program{"Multiply", "Add", "Loss"}; //2. 框架生成前向op + 自动补全反向OP + 插入梯度更新op std::vector<std::string> ops{"multiply_forward", "add_forward", "loss_forward", "loss_backward", "Add_forward", "multiply_backward", "update_grad"}; //3. 实例化 c++ 端 op 对象 std::vector<OpBase*> opClass {new MultipylyForward(), new AddForward(), new LossForward(), new LossBackward(), new AddBackward(), new MultipylyBackward(), new UpdateGrad()}; //4. 框架根据用户组网,自动给每个op的输入赋值,这里仅以乘法前向op作个例子。一定要记住一点:框架中所有输入数据、 //参数、模型中间输入、输出、以及每个参数的梯度都有一个 string 类型的名字,它的存在是为了给op输入赋值服务的 opClass[0]->inputs["X"] = MyTensor(10); opClass[0]->inputs["W"] = MyTensor(20); for (auto op : opClass) { op->Run(); } //5. 测试第1个op的输出 std::cout << opClass[0]->outputs["Y"].data; // 输出结果:200 }
分布式
每个进程启动后,它需要感知自己全局的进程数( world_size)及自身的进程 ID(或者 rank_id),由于每个进程上运行的都是同一份训练脚本,所以得事先在每个进程所在的系统上设置不同的环境变量,进程运行起来之后,就可以获取环境变量,从而确定自己的角色(Worker、PServer、Coordinator 等)及rank_id、world_size 等信息。
在运行过程中,还有两个重要的环节是 Barrier 和 Communicate. Barrier 的目的是为了实现进程间同步,比较成熟的开源项目有 gloo、mpi 等。Communicate 操作就是实现通信,满足进程间数据交换需求。通信可以在同类型硬件之间发生,比如 CPU 到 CPU、GPU 到 GPU,也可以发生在不同硬件之间,比如 GPU 到 CPU,通信后端也有多种形式,比如 grpc、nccl、socket 等。
Be First to Comment