本着“凡我不能创造的,我就不能理解”的思想,本系列文章会基于纯Python以及NumPy从零创建自己的深度学习框架,该框架类似PyTorch能实现自动求导。
要深入理解深度学习,从零开始创建的经验非常重要,从自己可以理解的角度出发,尽量不使用外部完备的框架前提下,实现我们想要的模型。本系列文章的宗旨就是通过这样的过程,让大家切实掌握深度学习底层实现,而不是仅做一个调包侠。
基于之前的计算图代码,在实现类似torch.unbind
时遇到了问题。因此,我们需要重写计算图的支撑代码,以支持这些运算。
存在的问题
在实现有单个输入产生多个输出的运算时,比如unbind
或split
。我们分别对多个输出后续进行不同的运算,在反向传播时,它们的梯度应该不同。
此时在反向传播时,应该传入一个梯度列表,每个元素代表由每个输出贡献的梯度。现有的代码只接收单个梯度,而不是列表。
重写Tensor#backward
def backward(self, grad: "Tensor" = None, retain_grad=False, create_graph=False) -> None: ''' 实现Tensor的反向传播 Args: grad: 如果该Tensor不是标量,则需要传递梯度进来 retain_grad: 是否保留梯度的中间变量 create_graph: 是否整个计算梯度的过程也需要保留到计算图中,即double_backprop Returns: ''' # 只能在requires_grad=True的Tensor上调用此方法 assert self.requires_grad, "called backward on tensor do not require grad" if not Config.backprop: return # 如果传递过来的grad为空 if grad is None: if self.shape == (): # 设置梯度值为1,grad本身不需要计算梯度 self._grad = Tensor(1., device=self.device) else: # 如果当前Tensor得到不是标量,那幺grad必须指定 raise RuntimeError("grad must be specified for non scalar") else: self._grad = ensure_tensor(grad, device=self.device) funcs = [] # 候选函数堆 seen_set = set() def add_func(f): if f not in seen_set: # heapq是小顶堆,为了实现大顶堆的效果,需要加一个负号 heapq.heappush(funcs, (-f.generation, len(seen_set), f)) seen_set.add(f) add_func(self.creator) while funcs: _, _, f = heapq.heappop(funcs) # 获取输出对应的梯度,解决多个输出梯度不一致的问题 gys = [output().grad.data for output in f.outputs] # output 是 weakref with using_config('backprop', create_graph): with OpWrapper(f.__class__.__name__, gys, backward=True): gxs = f.backward(*gys) if not isinstance(gxs, tuple): gxs = (gxs,) for x, gx in zip(f.inputs, gxs): if x.requires_grad and gx is not None: assert x.shape == gx.shape, f"grad shape must match tensor shape in { f!r}, { gx.shape!r} != { x.shape!r}" gx = Tensor(gx, device=self.device, dtype=self.dtype) if x.grad is None: x._grad = gx else: x._grad = x.grad + gx if x.creator is not None: add_func(x.creator) if not retain_grad: for y in f.outputs: y()._grad = None
修改点:
通过大顶堆来实现计算图的回溯算法。同时为Tensor
增加一个creator
,产生该输出的运算(function)。
这里增加了代次(generation)的概念,当前输出的generation由creator
对应的加1。
增加了weakref
对内存利用进行优化。
通过create_graph
支持double_backprop。
重写Function
class Function: def __init__(self) -> None: # 保存需要在backward()中使用的Tensor或其他对象(如Shape) self.saved_tensors = [] def save_for_backward(self, *x: Any) -> None: self.saved_tensors.extend(x) def forward(self, *args: Any, **kwargs: Any) -> NdArray: '''前向传播,进行真正运算的地方''' raise NotImplementedError("You must implement the forward function for custom Function.") def backward(self, grad: NdArray) -> Any: '''实现反向传播,计算梯度''' raise NotImplementedError("You must implement the backward method for your custom Function " "to use it with backward mode AD.") def __call__(self, *xs: "Tensor", **kwargs) -> "Tensor": # [t.data for t in xs]遍历Tensor中的data(NdArray)值,参与实际计算的都是NumPy的数组。 ys = self.forward(*[t.data for t in xs], **kwargs) requires_grad = any([t.requires_grad for t in xs]) if not isinstance(ys, tuple): ys = (ys,) outputs = [Tensor(y, device=xs[0].device, requires_grad=requires_grad) for y in ys] if Config.backprop: self.generation = max([x.generation for x in xs]) for output in outputs: # 设定每个输出是由此函数得到的 output.set_creator(self) self.inputs = xs # 记录输入 self.outputs = [weakref.ref(output) for output in outputs] # 通过弱引用记录输出 # 返回多个则通过元组 return tuple(outputs) if len(outputs) > 1 else outputs[0]
所有操作的基类Function
进行了重写,同时尽可能兼容之前的代码,在调用方法__call__
中维护了该操作的输入和输出。
如果有多个输出,统一通过元组来保存。
重写Function子类
以Split
为例,作为stack
的逆操作,我们的split
实际上就是torch.unbind
。
旧代码:
class Split(Function): '''Stack的逆操作''' def forward(ctx, inputs: NdArray, axis: int) -> NdArray: xp = get_array_module(inputs) xs = xp.split(inputs, inputs.shape[axis], axis) ys = [xp.squeeze(y, axis) for y in xs] # 去掉维度axis ctx.save_for_backward(len(ys), axis) return tuple(ys) def backward(ctx, grad: NdArray) -> NdArray: size, axis = ctx.saved_tensors grad /= size bigger_grad = [Tensor(grad)] * size return stack(bigger_grad, axis)
改写成:
class Split(Function): '''Stack的逆操作''' def forward(self, inputs: NdArray, axis: int) -> NdArray: xp = get_array_module(inputs) xs = xp.split(inputs, inputs.shape[axis], axis) ys = [xp.squeeze(y, axis) for y in xs] # 去掉维度axis self.save_for_backward(xp, axis, ys[0].shape, inputs.dtype) return tuple(ys) def backward(self, *grad: List[NdArray]) -> NdArray: xp, axis, shape, dtype = self.saved_tensors grads = [Tensor(xp.zeros(shape, dtype)) if g is None else Tensor(g) for g in grad] return stack(grads, axis)
改动如下:
ctx
变成了self
为了正确性,重写了其backward
现在梯度支持传入列表,代表split
多个输出贡献的不同梯度
这样才能跑通下面的测试用例:
def test_split(): x = np.arange(6).reshape((2, 3)).astype(np.float32) # x = array([[0., 1., 2.], # [3., 4., 5.]], dtype=float32) mx = Tensor(x, requires_grad=True) tx = torch.tensor(x, requires_grad=True) my = F.split(mx) ty = torch.split(tx, 1) # 这里返回的是元组 assert isinstance(my, tuple) assert np.allclose(my[0].data, ty[0].data) (my[0]).sum().backward() (ty[0]).sum().backward() print(mx.grad.data) print(tx.grad.data) assert np.allclose(mx.grad.data, tx.grad.data)
test_split.py::test_split PASSED [100%] [[1. 1. 1.] [0. 0. 0.]] tensor([[1., 1., 1.], [0., 0., 0.]])
这里由于只对split
后的第一个元素my[0]
进行了后续操作(sum
),因此反向传播后,也应该只有该元素会产生梯度,如上输出。
完整代码
类似地,我们就可以实现chunk
操作,即cat
的逆操作,将Tensor沿某一维分开,其参数chunks
为分割的份数,axis
为分割的维度。具体实现可以参考完整代码:
https://github.com/nlp-greyfoss/metagrad
Be First to Comment