Transformer 是一种基于**自注意力机制(Self-Attention)**的深度学习模型,最初由 Vaswani 等人在 2017 年的论文《Attention is All You Need》中提出。它彻底改变了自然语言处理(NLP)领域,成为许多任务(如机器翻译、文本生成、文本分类等)的基石。与传统的循环神经网络(RNN)和卷积神经网络(CNN)相比,Transformer 具有以下优势:
Transformer 模型是基于 Seq2Seq(Sequence-to-Sequence,序列到序列)架构的,但它对传统的 Seq2Seq 模型进行了重大改进。

经典 Seq2Seq 模型由两部分组成:
经典 Seq2Seq 模型通常使用 RNN(如 LSTM 或 GRU) 作为编码器和解码器的基础结构。
Transformer 基于 Seq2Seq 架构,但它通过引入自注意力机制(Self-Attention)和并行化处理,解决了经典 Seq2Seq 模型的许多局限性:

import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import matplotlib.pyplot as plt
import numpy as np
import copy
from pyexpat import features
将离散的词汇映射到连续的向量空间,即词嵌入(Word Embedding)
lass Embeddings(nn.Module):
def __init__(self, d_model, vocab):
# d_model是词嵌入的维度,vocab是词表大小
super(Embeddings, self).__init__()
self.lut = nn.Embedding(vocab, d_model)
self.d_model = d_model
def forward(self, x):
# 输入进模型的文本通过词汇映射后的张量
return self.lut(x) * math.sqrt(self.d_model) # 词嵌入乘以根号d_model
PositionalEncoding类:为词嵌入添加位置信息,因为Transformer本身无法处理序列的顺序信息,使用正弦和余弦函数生成位置编码,并将其加到词嵌入上
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
# 创建一个位置编码矩阵
pe = torch.zeros(max_len, d_model) # d_model是词嵌入的维度
# 初始化一个绝对编码矩阵,max行1列
position = torch.arange(0, max_len).unsqueeze(1)
# 初始化一个相对编码矩阵,max行d_model列
div_term = torch.exp(torch.arange(0, d_model, 2) *
-(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
# 二维扩展成三维
pe = pe.unsqueeze(0)
# 因为位置编码是固定的,不需要在训练过程中进行调整
self.register_buffer('pe', pe)
def forward(self, x):
# x是词嵌入后的张量
# pe编码太长了,将第二个维度缩小成与句子长度同等
x = x + self.pe[:, :x.size(1)]
return self.dropout(x)
掩码生成和注意力机制
subsequent_mask)np.triu 生成一个上三角矩阵,其中对角线以上的元素为 1,其余为 0。1 - sub_mask 将矩阵反转,使得对角线以上的元素为 0,其余为 1。(1, size, size) 的掩码张量,用于屏蔽未来位置的信息。attention)query 和 key 的点积,并除以 sqrt(d_k) 进行缩放,得到注意力分数。-1e9),以屏蔽这些位置。softmax 操作,得到注意力权重。dropout,对注意力权重进行随机丢弃。value 相乘,得到最终的注意力表示。# 生成掩码张量
def subsequent_mask(size):
attn_shape = (1, size, size)
sub_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')
return torch.from_numpy(1 - sub_mask)
# 构建注意力函数,输入Q、K、V张量,返回注意力表示和注意力权重张量
def attention(query, key, value, mask=None, dropout=None):
# 先将query的最后一个维度提取出来,就是词嵌入的维度
d_k = query.size(-1)
scores = torch.matmul(query, key.transpose(-2,-1)) / math.sqrt(d_k)
# 判断是否使用掩码张量
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9) # 填充0的地方设置为-1e9
# scores的最后一维进行softmax操作
p_attn = F.softmax(scores, dim=-1)
if dropout is not None:
p_attn = dropout(p_attn)
# 完成p_attn和value张量的乘法,返回query的注意力表示
return torch.matmul(p_attn, value), p_attntensor([[[1., 0., 0., 0.],
[0., 1., 0., 0.],
[0., 0., 1., 0.],
[0., 0., 0., 1.]],
[[1., 0., 0., 0.],
[0., 1., 0., 0.],
[0., 0., 1., 0.],
[0., 0., 0., 1.]]], grad_fn=<SoftmaxBackward0>)说明每个词只关注自己,因为:使用了 subsequent_mask,它屏蔽了未来位置的信息。因此,每个词只能看到自己及其之前的位置,而不能看到之后的位置
输入的 query, key, 和 value 都是相同的,并且这些张量的内容使得注意力分数在经过 softmax 后变成了单位矩阵
Q、K、V:计算结果代表query在key和value作用下的表示;Q是一段准备被概括的文本,K是给出的提示;V是大脑对提示K的延申
希望得到的是:伴随着给定文本Q和K,得到的V的信息越来越接近原始文本Q(用简单的V把Q表达了)
实现克隆函数,需要用到多个结构相同的线性层
def clones(module, N):
return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
# 多头注意力机制的类
class MultiHeadedAttention(nn.Module):
def __init__(self, head, embedding_dim, dropout=0.1):
super(MultiHeadedAttention, self).__init__()
# 词嵌入维度必须能被头数整除
assert embedding_dim % head == 0
self.d_k = embedding_dim // head
self.head = head
self.embedding_dim = embedding_dim
# 获得四个线性层,分别是Q、K、V以及最终输出线型层
self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)
self.attn = None
self.dropout = nn.Dropout(p=dropout)
def forward(self, query, key, value, mask=None):
if mask is not None:
# 确保mask的维度是正确的
mask = mask.unsqueeze(1)
# 得到batch_size,[batch_size, sequence_length, embedding_dim]
batch_size = query.size(0)
# 将输入的 query、key 和 value 张量通过线性变换后,重新组织成适合多头注意力机制计算的形式
query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)
for model, x in zip(self.linears, (query, key, value))]
# 将每个头的输出传入到注意力层
x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)
# 得到每个头的计算结果是4维的,需要将其重新组合成原始的输出形式,再转置回来。
x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.embedding_dim)
# 将线性变换后的结果通过线性层,得到最终的输出
return self.linears[-1](x)
后续代码:
head = 8
embedding_dim = 512
dropout = 0.1
query = key = value = pe_result # pe_result是经过位置编码的词嵌入张量
size = 4
mask = subsequent_mask(size)
mha = MultiHeadedAttention(head, embedding_dim, dropout)
mha_result = mha(query, key, value, mask) # 这是多头注意力机制的计算结果
print('多头注意力机制的计算结果mha_result',mha_result)
print(mha_result.shape)
# 构建前馈全连接网络类
class PositionwiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
self.w_1 = nn.Linear(d_model, d_ff)
self.w_2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# x是来自上一层的输出
return self.w_2(self.dropout(F.relu(self.w_1(x))))
d_model = 512
d_ff = 64
dropout = 0.1
x = mha_result
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
ff_result = ff(x) # 经过前馈全连接层的计算,得到新的表示
print('经过前馈全连接层的再次拟合ff_result',ff_result)
print(ff_result.shape)
# 构建规范化层类
class LayerNorm(nn.Module):
def __init__(self, features, eps=1e-6):
super(LayerNorm, self).__init__()
# 创建两个可学习的参数, 用于缩放和偏移
self.a_2 = nn.Parameter(torch.ones(features))
self.b_2 = nn.Parameter(torch.zeros(features))
self.eps = eps
def forward(self, x):
# x代表上一层网络的输出
mean = x.mean(-1, keepdim=True)
std = x.std(-1, keepdim=True)
return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
features = d_model = 512
eps = 1e-6
x = ff_result
ln = LayerNorm(features, eps)
ln_result = ln(x) # 得到规范化后的结果
print('规范化后的结果ln_result',ln_result)
print(ln_result.shape)
# 子层连接结构
class SublayerConnection(nn.Module):
def __init__(self, size, dropout=0.1):
super(SublayerConnection, self).__init__()
self.norm = LayerNorm(size)
self.dropout = nn.Dropout(p=dropout)
self.size = size
def forward(self, x, sublayer):
# x是上一层的输出,sublayer是子层
return x + self.dropout(sublayer(self.norm(x)))
size = d_model = 512
head = 8
dropout = 0.1
x = pe_result
mask = subsequent_mask(4)
self_attn = MultiHeadedAttention(head, d_model)
sublayer = lambda x: self_attn(x, x, x, mask)
sc = SublayerConnection(size, dropout)
sc_result = sc(x, sublayer)
print('子层连接结构sc_result',sc_result)
# 编码层的类
class Encoderlayer(nn.Module):
def __init__(self, size, self_attn, feed_forward, dropout):
super(Encoderlayer, self).__init__()
self.self_attn = self_attn
self.feed_forward = feed_forward
self.size = size
# 编码器有两个子层连接结构
self.sublayer = clones(SublayerConnection(size, dropout), 2)
def forward(self, x, mask):
# x是上一层的传入张量,mask是掩码张量
# 先让x经过第一个子层连接结构,内部包含多头注意力机制子层,再经过第二个子层连接结构,内部包含前馈全连接层子层
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
return self.sublayer[1](x, self.feed_forward)
size = d_model = 512
head = 8
x = pe_result # [2,4,512], size 设置为 4 (序列长度)是为了确保掩码矩阵能够正确应用于自注意力机制中的每一个位置
dropout = 0.1
self_attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
mask = subsequent_mask(4) # mask 形状为 (1, 4, 4),mask 会广播到与输入张量相同的批次大小(2, 4, 4)
el = Encoderlayer(size, self_attn, ff, dropout)
el_result = el(x, mask)
print('编码器层的计算结果el_result',el_result)
# 构建编码器类
class Encoder(nn.Module):
def __init__(self, layer, N):
super(Encoder, self).__init__()
self.layers = clones(layer, N)
self.norm = LayerNorm(layer.size)
def forward(self, x, mask):
for layer in self.layers:
x = layer(x, mask)
return self.norm(x)
size = d_model = 512
d_ff = 64
head = 8
c = copy.deepcopy
attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
dropout = 0.2
layer = Encoderlayer(size, c(attn), c(ff), dropout)
N = 8
mask = torch.zeros(1, 4, 4)
en = Encoder(layer, N)
en_result = en(x, mask)
print('编码器的计算结果en_result',en_result)
print(en_result.shape)
# 在大多数情况下,编码器的掩码张量为全0,而解码器的掩码张量为非0
# 使用DecoderLayer的类实现解码器层
class DecoderLayer(nn.Module):
def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
"""初始化函数的参数有5个, 分别是size,代表词嵌入的维度大小, 同时也代表解码器层的尺寸,
第二个是self_attn,多头自注意力对象,也就是说这个注意力机制需要Q=K=V,
第三个是src_attn,多头注意力对象,这里Q!=K=V, 第四个是前馈全连接层对象,最后就是droupout置0比率.
"""
super(DecoderLayer, self).__init__()
# 在初始化函数中, 主要就是将这些输入传到类中
self.size = size
self.self_attn = self_attn
self.src_attn = src_attn
self.feed_forward = feed_forward
self.dropout = dropout
# 按照结构图使用clones函数克隆三个子层连接对象.
self.sublayer = clones(SublayerConnection(size, dropout), 3)
def forward(self, x, memory, source_mask, target_mask):
"""forward函数中的参数有4个,分别是来自上一层的输入x,
来自编码器层的语义存储变量mermory, 以及源数据掩码张量和目标数据掩码张量.
"""
m = memory
# 将x传入第一个子层结构,第一个子层结构的输入分别是x和self-attn函数,因为是自注意力机制,所以Q,K,V都是x,
# 最后一个参数是目标数据掩码张量,这时要对目标数据进行遮掩,因为此时模型可能还没有生成任何目标数据,
# 比如在解码器准备生成第一个字符或词汇时,我们其实已经传入了第一个字符以便计算损失,
# 但是我们不希望在生成第一个字符时模型能利用这个信息,因此我们会将其遮掩,同样生成第二个字符或词汇时,
# 模型只能使用第一个字符或词汇信息,第二个字符以及之后的信息都不允许被模型使用.
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, target_mask))
# 接着进入第二个子层,这个子层中常规的注意力机制,q是输入x; k,v是编码层输出memory,
# 同样也传入source_mask,但是进行源数据遮掩的原因并非是抑制信息泄漏,而是遮蔽掉对结果没有意义的字符而产生的注意力值,
# 以此提升模型效果和训练速度. 这样就完成了第二个子层的处理.
x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, source_mask))
# 最后一个子层就是前馈全连接子层,经过它的处理后就可以返回结果.这就是我们的解码器层结构.
return self.sublayer[2](x, self.feed_forward)
size = d_model = 512
head = 8
d_ff = 64
dropout = 0.2
self_attn = src_attn = MultiHeadedAttention(head, d_model, dropout) # 为了方便,这里将self_attn和src_attn设置为同一个对象
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
x = pe_result
memory = en_result # 这是编码器的计算结果
mask = torch.zeros(1, 4, 4)
source_mask = target_mask = mask
dl = DecoderLayer(size, self_attn, src_attn, ff, dropout)
dl_result = dl(x, memory, source_mask, target_mask)
print('解码器层的计算结果dl_result',dl_result)
print(dl_result.shape)
# 解码器的作用:根据编码器的结果以及上一次预测的结果, 对下一次可能出现的'值'进行特征表示!!
class Decoder(nn.Module):
def __init__(self, layer, N):
super(Decoder, self).__init__()
self.layers = clones(layer, N)
self.norm = LayerNorm(layer.size)
def forward(self, x, memory, source_mask, target_mask):
for layer in self.layers:
x = layer(x, memory, source_mask, target_mask)
return self.norm(x)
size = d_model = 512
d_ff = 64
head = 8
dropout = 0.2
c = copy.deepcopy
attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
layer = DecoderLayer(size, c(attn), c(attn), c(ff), dropout)
N = 8
x = pe_result
memory = en_result
mask = torch.zeros(1, 4, 4)
source_mask = target_mask = mask
de = Decoder(layer, N)
de_result = de(x, memory, source_mask, target_mask)
print('解码器的计算结果de_result',de_result)
print(de_result.shape)
# 构建Genrator类
class Generator(nn.Module):
def __init__(self, d_model, vocab_size):
super(Generator, self).__init__()
# 维度从d_model到vocab_size
self.project = nn.Linear(d_model, vocab_size)
def forward(self, x):
return F.log_softmax(self.project(x), dim=-1)
d_model = 512
vocab_size = 1000
x = de_result
gen = Generator(d_model, vocab_size)
gen_result = gen(x)
print('生成器的计算结果gen_result',gen_result)
print(gen_result.shape)
# 这个输出表示每个位置上每个词的概率分布,用于最终预测目标序列中的词,
# 根据输出后的向量,和目标词的词表,我们可以得到一个概率分布,从而得到预测结果.
# 使用EncoderDecoder类来实现编码器-解码器结构
class EncoderDecoder(nn.Module):
def __init__(self, encoder, decoder, source_embed, target_embed, generator):
"""初始化函数中有5个参数, 分别是编码器对象, 解码器对象,
源数据嵌入函数, 目标数据嵌入函数, 以及输出部分的类别生成器对象
"""
super(EncoderDecoder, self).__init__()
# 将参数传入到类中
self.encoder = encoder
self.decoder = decoder
self.src_embed = source_embed
self.tgt_embed = target_embed
self.generator = generator
def forward(self, source, target, source_mask, target_mask):
"""在forward函数中,有四个参数, source代表源数据, target代表目标数据,
source_mask和target_mask代表对应的掩码张量"""
# 在函数中, 将source, source_mask传入编码函数, 得到结果后,
# 与source_mask,target,和target_mask一同传给解码函数.
return self.decode(self.encode(source, source_mask), source_mask,
target, target_mask)
def encode(self, source, source_mask):
"""编码函数, 以source和source_mask为参数"""
# 使用src_embed对source做处理, 然后和source_mask一起传给self.encoder
return self.encoder(self.src_embed(source), source_mask)
def decode(self, memory, source_mask, target, target_mask):
"""解码函数, 以memory即编码器的输出, source_mask, target, target_mask为参数"""
# 使用tgt_embed对target做处理, 然后和source_mask, target_mask, memory一起传给self.decoder
return self.decoder(self.tgt_embed(target), memory, source_mask, target_mask)
# source_mask用于遮蔽输入句子中的某些部分,主要用于处理变长的输入序列,确保模型不会关注到填充的部分或对结果没有意义的字符或词
# target_mask用于解码器侧,防止模型在生成目标序列时看到未来的信息
vocab_size = 1000
d_model = 512
encoder = en
decoder = de
source_embed = nn.Embedding(vocab_size, d_model)
target_embed = nn.Embedding(vocab_size, d_model)
generator = gen
source = target = torch.LongTensor([[100,2,421,508], [998,1,4,6]])
source_mask = target_mask = torch.zeros(1, 4, 4)
ed = EncoderDecoder(encoder, decoder, source_embed, target_embed, generator)
ed_result = ed(source, target, source_mask, target_mask)
# 这个输出表示解码器对输入序列的特征表示,还没有经过生成器映射到词汇表大小的维度
print('编码器-解码器的计算结果ed_result',ed_result)
print(ed_result.shape)
# 构建transformer模型
def make_model(source_vocab, target_vocab, N=6,
d_model=512, d_ff=2048, head=8, dropout=0.1):
"""该函数用来构建模型, 有7个参数,分别是源数据特征(词汇)总数,目标数据特征(词汇)总数,
编码器和解码器堆叠数,词向量映射维度,前馈全连接网络中变换矩阵的维度,
多头注意力结构中的多头数,以及置零比率dropout."""
# 首先得到一个深度拷贝命令,接下来很多结构都需要进行深度拷贝,
# 来保证他们彼此之间相互独立,不受干扰.
c = copy.deepcopy
# 实例化了多头注意力类,得到对象attn
attn = MultiHeadedAttention(head, d_model)
# 然后实例化前馈全连接类,得到对象ff
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
# 实例化位置编码类,得到对象position
position = PositionalEncoding(d_model, dropout)
# 根据结构图, 最外层是EncoderDecoder,在EncoderDecoder中,
# 分别是编码器层,解码器层,源数据Embedding层和位置编码组成的有序结构,
# 目标数据Embedding层和位置编码组成的有序结构,以及类别生成器层.
# 在编码器层中有attention子层以及前馈全连接子层,
# 在解码器层中有两个attention子层以及前馈全连接层.
model = EncoderDecoder(
Encoder(Encoderlayer(d_model, c(attn), c(ff), dropout), N),
Decoder(DecoderLayer(d_model, c(attn), c(attn),
c(ff), dropout), N),
nn.Sequential(Embeddings(d_model, source_vocab), c(position)),
nn.Sequential(Embeddings(d_model, target_vocab), c(position)),
Generator(d_model, target_vocab))
# 模型结构完成后,接下来就是初始化模型中的参数,比如线性层中的变换矩阵
# 这里一但判断参数的维度大于1,则会将其初始化成一个服从均匀分布的矩阵,
# 对于权重矩阵,良好的初始化可以帮助网络更快地收敛,并避免梯度消失或爆炸问题.对于权重矩阵(一维),良好的初始化可以帮助网络更快地收敛,并避免梯度消失或爆炸问题
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
return model结果:



原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。