Press "Enter" to skip to content

机器学习训练框架概述

本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.

简介

 

手把手教你如何自己设计实现一个深度学习框架(附代码实现) 对机器学习在工程上的实现和抽象说的比较透。 tinynn 只是一个「玩具」版本的深度学习框架,一个成熟的深度学习框架至少还需要:支持自动求导、高运算效率(静态语言加速、支持 GPU 加速)、提供丰富的算法实现、提供易用的接口和详细的文档等等。

 

抽象层

 

组件抽象

 

神经网络运算主要包含训练 training 和预测 predict (或 inference) 两个阶段,训练的基本流程是:输入数据 -> 网络层前向传播 -> 计算损失 -> 网络层反向传播梯度 -> 更新参数,预测的基本流程是 输入数据 -> 网络层前向传播 -> 输出结果。从运算的角度看,主要可以分为三种类型的计算:

 

 

    1. 数据在网络层之间的流动:前向传播和反向传播可以看做是张量 Tensor(多维数组)在网络层之间的流动(前向传播流动的是输入输出,反向传播流动的是梯度),每个网络层会进行一定的运算,然后将结果输入给下一层

 

    1. 计算损失:衔接前向和反向传播的中间过程,定义了模型的输出与真实值之间的差异,用来后续提供反向传播所需的信息

 

    1. 参数更新:使用计算得到的梯度对网络参数进行更新的一类计算

 

 

基于这个三种类型,我们可以对网络的基本组件做一个抽象

 

 

    1. tensor 张量,这个是神经网络中数据的基本单位

 

    1. layer 网络层,负责接收上一层的输入,进行该层的运算,将结果输出给下一层,由于 tensor 的流动有前向和反向两个方向,因此对于每种类型网络层我们都需要同时实现 forward 和 backward 两种运算

 

    1. loss 损失,在给定模型预测值与真实值之后,该组件输出损失值以及关于最后一层的梯度(用于梯度回传)

 

optimizer 优化器,负责使用梯度更新模型的参数 然后我们还需要一些组件把上面这个 4 种基本组件整合到一起,形成一个 pipeline

net 组件负责管理 tensor 在 layers 之间的前向和反向传播,同时能提供获取参数、设置参数、获取梯度的接口
model 组件负责整合所有组件,形成整个 pipeline。即 net 组件进行前向传播 -> losses 组件计算损失和梯度 -> net 组件将梯度反向传播 -> optimizer 组件将梯度更新到参数。

 

基本的框架图如下图

 

 

组件实现

 

按照上面的抽象,我们可以写出整个流程代码如下。PS:一个架构设计的典型案例

 

# define model
net = Net([layer1, layer2, ...])
model = Model(net, loss_fn, optimizer)
# training,将 net、loss、optimizer 一起传给 model,model 实现了 forward、backward 和 apply_grad 三个接口分别对应前向传播、反向传播和参数更新三个功能
pred = model.forward(train_X)
loss, grads = model.backward(pred, train_Y)
model.apply_grad(grads)
# inference
test_pred = model.forward(test_X)

 

tensor 张量是神经网络中基本的数据单位,我们这里直接使用 numpy.ndarray 类作为 tensor 类的实现。 layer需要有提供 forward 和 backward 接口进行对应的运算。同时还应该将该层的参数和梯度记录下来。先实现一个基类如下

 

class Layer(object):
  def __init__(self, name):
      self.name = name
      self.params, self.grads = None, None
  def forward(self, inputs):
      raise NotImplementedError
  def backward(self, grad):
      raise NotImplementedError

 

最基础的一种网络层是全连接网络层

 

class Dense(Layer):
  def __init__(self, num_in, num_out,w_init=XavierUniformInit(),b_init=ZerosInit()):
      super().__init__("Linear")
      self.params = {
          "w": w_init([num_in, num_out]),
          "b": b_init([1, num_out])}
      self.inputs = None
  # forward 方法接收上层的输入 inputs,实现  的运算
  def forward(self, inputs):
      self.inputs = inputs
      return inputs @ self.params["w"] + self.params["b"]
  # backward 的方法接收来自上层的梯度,计算关于参数  和输入的梯度,然后返回关于输入的梯度
  def backward(self, grad):
      self.grads["w"] = self.inputs.T @ grad
      self.grads["b"] = np.sum(grad, axis=0)
      return grad @ self.params["w"].T

 

激活函数可以看做是一种网络层

 

class Activation(Layer):
  """Base activation layer"""
  def __init__(self, name):
      super().__init__(name)
      self.inputs = None
  def forward(self, inputs):
      self.inputs = inputs
      return self.func(inputs)
  def backward(self, grad):
      return self.derivative_func(self.inputs) * grad
  def func(self, x):
      raise NotImplementedError
  def derivative_func(self, x):
      raise NotImplementedError

 

net 类负责管理 tensor 在 layers 之间的前向和反向传播

 

class Net(object):
  def __init__(self, layers):
      self.layers = layers
  # 按顺序遍历所有层,每层计算的输出作为下一层的输入
  def forward(self, inputs):
      for layer in self.layers:
          inputs = layer.forward(inputs)
      return inputs
  # 逆序遍历所有层,将每层的梯度作为下一层的输入
  def backward(self, grad):
      all_grads = [] # 将每个网络层参数的梯度保存下来返回,后面参数更新需要用到
      for layer in reversed(self.layers):
          grad = layer.backward(grad)
          all_grads.append(layer.grads)
      return all_grads[::-1]
  def get_params_and_grads(self):
      for layer in self.layers:
          yield layer.params, layer.grads
  def get_parameters(self):
      return [layer.params for layer in self.layers]
  def set_parameters(self, params):
      for i, layer in enumerate(self.layers):
          for key in layer.params.keys():
              layer.params[key] = params[i][key]

 

losses 组件需要做两件事情

 

class BaseLoss(object):
    # 计算损失值
    def loss(self, predicted, actual):
        raise NotImplementedError
    # 计算损失值和关于预测值的梯度
    def grad(self, predicted, actual):
        raise NotImplementedError

 

optimizer 主要实现一个接口 compute_step,这个方法根据当前的梯度,计算返回实际优化时每个参数改变的步长。

 

class BaseOptimizer(object):
    def __init__(self, lr, weight_decay):
        self.lr = lr
        self.weight_decay = weight_decay
    def compute_step(self, grads, params):
        step = list()
        # flatten all gradients
        flatten_grads = np.concatenate([np.ravel(v) for grad in grads for v in grad.values()])
        # compute step
        flatten_step = self._compute_step(flatten_grads)
        # reshape gradients
        p = 0
        for param in params:
            layer = dict()
            for k, v in param.items():
                block = np.prod(v.shape)
                _step = flatten_step[p:p+block].reshape(v.shape)
                _step -= self.weight_decay * v
                layer[k] = _step
                p += block
            step.append(layer)
        return step
    def _compute_step(self, grad):
        raise NotImplementedError

 

model 类实现了我们一开始设计的三个接口 forward、backward 和 apply_grad

 

class Model(object):
  def __init__(self, net, loss, optimizer):
      self.net = net
      self.loss = loss
      self.optimizer = optimizer
  def forward(self, inputs):
      return self.net.forward(inputs)
  def backward(self, preds, targets):
      loss = self.loss.loss(preds, targets)
      grad = self.loss.grad(preds, targets)
      grads = self.net.backward(grad)
      params = self.net.get_parameters()
      step = self.optimizer.compute_step(grads, params)
      return loss, step
  def apply_grad(self, grads):
    for grad, (param, _) in zip(grads, self.net.get_params_and_grads()):
      for k, v in param.items():
          param[k] += grad[k]

 

执行层

 

上面的抽象组件这幺热闹,到真正的实现就又是另一幅天地了,可以好好品味 上层model 抽象与底层数据流图的gap,layer1 ==> layer2 ==> …layern 被 展开 成了 op,tenor 在layer 之间的流动 转换为了 dag op 间的流动。 深度学习分布式训练的现状及未来 AI 模型训练任务流程:初始化模型参数 -> 逐条读取训练样本 -> 前向、反向、参数更新 -> 读取下一条样本 -> 前向、反向、参数更新 -> … 循环,直至收敛。在软件层面的体现就是计算机按顺序运行一个个 OP。

 

几乎所有的 AI 框架都有 OP 的概念,简单来说就是一个函数,完成某个具体的功能,比如说加法、矩阵乘法、卷积等。为什幺要多此一举引入这样一个概念呢?这其实是给每个具体计算功能抽象出一个统一接口,在静态图场景下能实现函数的编排(OP 的自由组合)。

 

极简demo

 

动手学深度学习框架(4)- 手把手教你写一个功能完整的简易 Demo

 

#include <iostream>
#include <iomanip>
#include <string>
#include <unordered_map>
#include <random>
#include <chrono>
#include <any>
//自定义 Tensor 类型,这里数据成员非常简单,就是个标量,重载了基本数学运算符
class MyTensor {
public:
    uint32_t data;
public:
    MyTensor(){};
    MyTensor(uint32_t x) : data(x) {}
    MyTensor operator*(const MyTensor& a) {
        this->data = this->data * a.data;
        return *this;
    }
    MyTensor operator+(const MyTensor& a) {
        this->data = this->data + a.data;
        return *this;
    }
    MyTensor operator-(const MyTensor& a) {
        this->data = this->data - a.data;
        return *this;
    }
    MyTensor operator*(const int& a) {
        this->data = this->data * a;
        return *this;
    }
};
// Op 基类
class OpBase {
public:
    std::unordered_map<std::string, MyTensor> inputs;
    std::unordered_map<std::string, MyTensor> outputs;
    std::unordered_map<std::string, MyTensor> labels;
public:
    virtual void Run() = 0;
};
// 乘法前向 Op
class MultipylyForward : public OpBase {
public:
    void Run() {
        MyTensor x = inputs["X"];
        MyTensor w = inputs["W"];
        MyTensor y1 = x * w;
        outputs["Y"] = y1;
    }
};
// 乘法反向 Op
class MultipylyBackward : public OpBase {
public:
    void Run() {
        MyTensor x = inputs["X"];
        outputs["Y"] = x;
    }
};
// 加法前向 Op
class AddForward : public OpBase {
public:
    void Run() {
        MyTensor x1 = inputs["X1"];
        MyTensor x2 = inputs["X2"];
        MyTensor y = x1 + x2;
        outputs["Y"] = y;
    }
};
// 加法反向 Op
class AddBackward : public OpBase {
public:
    void Run() {
        MyTensor x;
        x.data = 1;
        outputs["Y"] = x;
    }
};
// loss 前向 Op,这里选取 MSE 作为示例
class LossForward : public OpBase {
public:
    void Run() {
        MyTensor y = inputs["X"];
        MyTensor label = labels["Label"];
        MyTensor loss = (y - label) * (y - label);
        outputs["Y"] = loss;
    }
};
// loss 反向 Op
class LossBackward : public OpBase {
public:
    void Run() {
        MyTensor y = inputs["X"];
        MyTensor label = labels["Label"];
        outputs["Y"] = (y - label) + (y - label);
    }
};
// 梯度更新 Op
class UpdateGrad : public OpBase {
public:
    double lr = 0.1;
    std::unordered_map<std::string, MyTensor> inputs;
    std::unordered_map<std::string, MyTensor> outputs;
public:
    void Run() {
        MyTensor w = inputs["W"];
        MyTensor grad = inputs["Grad1"] * inputs["Grad2"] * inputs["Grad3"];  // 链式求导
        MyTensor lr;
        lr.data = this->lr;
        outputs["Y"] = w - lr * grad;
    }
};
int main() {
    //1. 用户自定义前向组网
    std::vector<std::string> program{"Multiply", "Add", "Loss"};
    //2. 框架生成前向op + 自动补全反向OP + 插入梯度更新op
    std::vector<std::string> ops{"multiply_forward", "add_forward", "loss_forward",
        "loss_backward", "Add_forward", "multiply_backward", "update_grad"};
    //3. 实例化 c++ 端 op 对象
    std::vector<OpBase*> opClass {new MultipylyForward(), new AddForward(), new LossForward(),
        new LossBackward(), new AddBackward(), new MultipylyBackward(), new UpdateGrad()};
    //4. 框架根据用户组网,自动给每个op的输入赋值,这里仅以乘法前向op作个例子。一定要记住一点:框架中所有输入数据、
    //参数、模型中间输入、输出、以及每个参数的梯度都有一个 string 类型的名字,它的存在是为了给op输入赋值服务的
    opClass[0]->inputs["X"] = MyTensor(10);
    opClass[0]->inputs["W"] = MyTensor(20);
    for (auto op : opClass) {
        op->Run();
    }
    //5. 测试第1个op的输出
    std::cout << opClass[0]->outputs["Y"].data;  // 输出结果:200
}

 

分布式

 

深度学习分布式训练框架的运行机制

 

每个进程启动后,它需要感知自己全局的进程数( world_size)及自身的进程 ID(或者 rank_id),由于每个进程上运行的都是同一份训练脚本,所以得事先在每个进程所在的系统上设置不同的环境变量,进程运行起来之后,就可以获取环境变量,从而确定自己的角色(Worker、PServer、Coordinator 等)及rank_id、world_size 等信息。

 

在运行过程中,还有两个重要的环节是 Barrier 和 Communicate. Barrier 的目的是为了实现进程间同步,比较成熟的开源项目有 gloo、mpi 等。Communicate 操作就是实现通信,满足进程间数据交换需求。通信可以在同类型硬件之间发生,比如 CPU 到 CPU、GPU 到 GPU,也可以发生在不同硬件之间,比如 GPU 到 CPU,通信后端也有多种形式,比如 grpc、nccl、socket 等。

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注