## 2. config

```import torch
import torch.nn as nn
import numpy as np
import math
class Config(object):
def __init__(self):
self.vocab_size = 6
self.d_model = 20
assert self.d_model % self.n_heads == 0
self.UNK = 5
self.N = 6
self.p = 0.1
config = Config()```

## 3. Embedding

Embedding部分接受原始的文本输入(batch_size*seq_len,例:[[1,3,10,5],[3,4,5],[5,3,1,1]])，叠加一个普通的Embedding层以及一个Positional Embedding层，输出最后结果。

```torch.nn.Embedding
torch.nn.Embedding

```class Embedding(nn.Module):
def __init__(self,vocab_size):
super(Embedding, self).__init__()
def forward(self,x):
for i in range(len(x)):
x[i].extend([config.UNK] * (config.padding_size - len(x[i]))) # 注意 UNK是你词表中用来表示oov的token索引，这里进行了简化，直接假设为6
else:
x = self.embedding(torch.tensor(x)) # batch_size * seq_len * d_model
return x```

```class Positional_Encoding(nn.Module):
def __init__(self,d_model):
super(Positional_Encoding,self).__init__()
self.d_model = d_model
def forward(self,seq_len,embedding_dim):
positional_encoding = np.zeros((seq_len,embedding_dim))
for pos in range(positional_encoding.shape[0]):
for i in range(positional_encoding.shape[1]):
positional_encoding[pos][i] = math.sin(pos/(10000**(2*i/self.d_model))) if i % 2 == 0 else math.cos(pos/(10000**(2*i/self.d_model)))

## 4. Encoder

```forward
V
y=x```

```class Mutihead_Attention(nn.Module):
self.dim_v = dim_v
self.dim_k = dim_k
self.q = nn.Linear(d_model,dim_k)
self.k = nn.Linear(d_model,dim_k)
self.v = nn.Linear(d_model,dim_v)
self.o = nn.Linear(dim_v,d_model)
self.norm_fact = 1 / math.sqrt(d_model)
# 此处是 sequence mask ，防止 decoder窥视后面时间步的信息。
matirx = np.ones((dim,dim))
assert self.dim_k % self.n_heads == 0 and self.dim_v % self.n_heads == 0
# size of x : [batch_size * seq_len * batch_size]
# 对 x 进行自注意力
Q = self.q(x).reshape(-1,x.shape[0],x.shape[1],self.dim_k // self.n_heads) # n_heads * batch_size * seq_len * dim_k
K = self.k(x).reshape(-1,x.shape[0],x.shape[1],self.dim_k // self.n_heads) # n_heads * batch_size * seq_len * dim_k
V = self.v(y).reshape(-1,y.shape[0],y.shape[1],self.dim_v // self.n_heads) # n_heads * batch_size * seq_len * dim_v
# print("Attention V shape : {}".format(V.shape))
attention_score = torch.matmul(Q,K.permute(0,1,3,2)) * self.norm_fact
output = torch.matmul(attention_score,V).reshape(y.shape[0],y.shape[1],-1)
# print("Attention output shape : {}".format(output.shape))
output = self.o(output)
return output```

### Feed Forward

img

```class Feed_Forward(nn.Module):
def __init__(self,input_dim,hidden_dim=2048):
super(Feed_Forward, self).__init__()
self.L1 = nn.Linear(input_dim,hidden_dim)
self.L2 = nn.Linear(hidden_dim,input_dim)
def forward(self,x):
output = nn.ReLU()(self.L1(x))
output = self.L2(output)
return output```

```class Add_Norm(nn.Module):
def __init__(self):
self.dropout = nn.Dropout(config.p)
def forward(self,x,sub_layer,**kwargs):
sub_output = sub_layer(x,**kwargs)
# print("{} output : {}".format(sub_layer,sub_output.size()))
x = self.dropout(x + sub_output)
layer_norm = nn.LayerNorm(x.size()[1:])
out = layer_norm(x)
return out```

OK，Encoder中所有模块我们已经讲解完毕，接下来我们将其拼接作为Encoder

```class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.positional_encoding = Positional_Encoding(config.d_model)
self.feed_forward = Feed_Forward(config.d_model)
def forward(self,x): # batch_size * seq_len 并且 x 的类型不是tensor，是普通list
x += self.positional_encoding(x.shape[1],config.d_model)
# print("After positional_encoding: {}".format(x.size()))
return output```

## 5.Decoder

```class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.positional_encoding = Positional_Encoding(config.d_model)
self.feed_forward = Feed_Forward(config.d_model)
def forward(self,x,encoder_output): # batch_size * seq_len 并且 x 的类型不是tensor，是普通list
# print(x.size())
x += self.positional_encoding(x.shape[1],config.d_model)
# print(x.size())
# 第一个 sub_layer
# 第二个 sub_layer
# 第三个 sub_layer
return output```

## 6.Transformer

Output模块的 Linear 和 Softmax 的实现也包含在下面的代码中

```class Transformer_layer(nn.Module):
def __init__(self):
super(Transformer_layer, self).__init__()
self.encoder = Encoder()
self.decoder = Decoder()
def forward(self,x):
x_input,x_output = x
encoder_output = self.encoder(x_input)
decoder_output = self.decoder(x_output,encoder_output)
return (encoder_output,decoder_output)
class Transformer(nn.Module):
def __init__(self,N,vocab_size,output_dim):
super(Transformer, self).__init__()
self.embedding_input = Embedding(vocab_size=vocab_size)
self.embedding_output = Embedding(vocab_size=vocab_size)
self.output_dim = output_dim
self.linear = nn.Linear(config.d_model,output_dim)
self.softmax = nn.Softmax(dim=-1)
self.model = nn.Sequential(*[Transformer_layer() for _ in range(N)])
def forward(self,x):
x_input , x_output = x
x_input = self.embedding_input(x_input)
x_output = self.embedding_output(x_output)
_ , output = self.model((x_input,x_output))
output = self.linear(output)
output = self.softmax(output)
return output```

## 完整代码

```# @Author:Yifx
# @Contact: [email protected]
# @Time:2021/9/16 20:02
# @Software: PyCharm
"""

"""
import torch
import torch.nn as nn
import numpy as np
import math
class Config(object):
def __init__(self):
self.vocab_size = 6
self.d_model = 20
assert self.d_model % self.n_heads == 0
self.UNK = 5
self.N = 6
self.p = 0.1
config = Config()
class Embedding(nn.Module):
def __init__(self,vocab_size):
super(Embedding, self).__init__()
def forward(self,x):
for i in range(len(x)):
x[i].extend([config.UNK] * (config.padding_size - len(x[i]))) # 注意 UNK是你词表中用来表示oov的token索引，这里进行了简化，直接假设为6
else:
x = self.embedding(torch.tensor(x)) # batch_size * seq_len * d_model
return x
class Positional_Encoding(nn.Module):
def __init__(self,d_model):
super(Positional_Encoding,self).__init__()
self.d_model = d_model
def forward(self,seq_len,embedding_dim):
positional_encoding = np.zeros((seq_len,embedding_dim))
for pos in range(positional_encoding.shape[0]):
for i in range(positional_encoding.shape[1]):
positional_encoding[pos][i] = math.sin(pos/(10000**(2*i/self.d_model))) if i % 2 == 0 else math.cos(pos/(10000**(2*i/self.d_model)))
self.dim_v = dim_v
self.dim_k = dim_k
self.q = nn.Linear(d_model,dim_k)
self.k = nn.Linear(d_model,dim_k)
self.v = nn.Linear(d_model,dim_v)
self.o = nn.Linear(dim_v,d_model)
self.norm_fact = 1 / math.sqrt(d_model)
# 此处是 sequence mask ，防止 decoder窥视后面时间步的信息。
matirx = np.ones((dim,dim))
assert self.dim_k % self.n_heads == 0 and self.dim_v % self.n_heads == 0
# size of x : [batch_size * seq_len * batch_size]
# 对 x 进行自注意力
Q = self.q(x).reshape(-1,x.shape[0],x.shape[1],self.dim_k // self.n_heads) # n_heads * batch_size * seq_len * dim_k
K = self.k(x).reshape(-1,x.shape[0],x.shape[1],self.dim_k // self.n_heads) # n_heads * batch_size * seq_len * dim_k
V = self.v(y).reshape(-1,y.shape[0],y.shape[1],self.dim_v // self.n_heads) # n_heads * batch_size * seq_len * dim_v
# print("Attention V shape : {}".format(V.shape))
attention_score = torch.matmul(Q,K.permute(0,1,3,2)) * self.norm_fact
output = torch.matmul(attention_score,V).reshape(y.shape[0],y.shape[1],-1)
# print("Attention output shape : {}".format(output.shape))
output = self.o(output)
return output
class Feed_Forward(nn.Module):
def __init__(self,input_dim,hidden_dim=2048):
super(Feed_Forward, self).__init__()
self.L1 = nn.Linear(input_dim,hidden_dim)
self.L2 = nn.Linear(hidden_dim,input_dim)
def forward(self,x):
output = nn.ReLU()(self.L1(x))
output = self.L2(output)
return output
def __init__(self):
self.dropout = nn.Dropout(config.p)
def forward(self,x,sub_layer,**kwargs):
sub_output = sub_layer(x,**kwargs)
# print("{} output : {}".format(sub_layer,sub_output.size()))
x = self.dropout(x + sub_output)
layer_norm = nn.LayerNorm(x.size()[1:])
out = layer_norm(x)
return out
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.positional_encoding = Positional_Encoding(config.d_model)
self.feed_forward = Feed_Forward(config.d_model)
def forward(self,x): # batch_size * seq_len 并且 x 的类型不是tensor，是普通list
x += self.positional_encoding(x.shape[1],config.d_model)
# print("After positional_encoding: {}".format(x.size()))
return output
# 在 Decoder 中，Encoder的输出作为Query和KEy输出的那个东西。即 Decoder的Input作为V。此时是可行的
# 我们知道，QK那个过程得到的结果是 batch_size * seq_len * seq_len .既然 seq_len 一样，那幺我们可以这样操作
# 这样操作的意义是，Outputs 中的 token 分别对于 Inputs 中的每个token作注意力
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.positional_encoding = Positional_Encoding(config.d_model)
self.feed_forward = Feed_Forward(config.d_model)
def forward(self,x,encoder_output): # batch_size * seq_len 并且 x 的类型不是tensor，是普通list
# print(x.size())
x += self.positional_encoding(x.shape[1],config.d_model)
# print(x.size())
# 第一个 sub_layer
# 第二个 sub_layer
# 第三个 sub_layer
return output
class Transformer_layer(nn.Module):
def __init__(self):
super(Transformer_layer, self).__init__()
self.encoder = Encoder()
self.decoder = Decoder()
def forward(self,x):
x_input,x_output = x
encoder_output = self.encoder(x_input)
decoder_output = self.decoder(x_output,encoder_output)
return (encoder_output,decoder_output)
class Transformer(nn.Module):
def __init__(self,N,vocab_size,output_dim):
super(Transformer, self).__init__()
self.embedding_input = Embedding(vocab_size=vocab_size)
self.embedding_output = Embedding(vocab_size=vocab_size)
self.output_dim = output_dim
self.linear = nn.Linear(config.d_model,output_dim)
self.softmax = nn.Softmax(dim=-1)
self.model = nn.Sequential(*[Transformer_layer() for _ in range(N)])
def forward(self,x):
x_input , x_output = x
x_input = self.embedding_input(x_input)
x_output = self.embedding_output(x_output)
_ , output = self.model((x_input,x_output))
output = self.linear(output)
output = self.softmax(output)
return output```