Advertisement

A deep dive into transformer‘s inner workings!

阅读量:

作者:禅与计算机程序设计艺术

1.简介

近年来,Transformer模型在NLP领域受到了广泛关注,并在文本处理领域得到了广泛应用。然而,要深入理解Transformer的工作原理,可能会感到理解起来有一定难度。因此,本文旨在通过从底层研究出发,深入剖析Transformer模型的结构、运行机制和实现原理,揭示其内部工作机制的奥秘。通过系统性地分析Transformer模型的结构、运行机制和实现原理,读者将能够更全面地理解其工作原理,并将其应用到更多样化的NLP任务中。

2.相关概念

首先,本文对一些相关的概念进行简单介绍:

Transformer模型

Transformer模型是以注意力机制为核心的序列到序列模型,其主要特点在于能够有效建立源序列与目标序列之间的映射关系。该模型被设计以解决包括机器翻译、图像描述和智能聊天机器人在内的多种序列到序列处理任务。

Attention机制

注意力机制是一种用于计算数据集中各元素之间相互关系的方法,其核心在于分析和处理元素间的关联性。该方法主要由三个关键组成部分构成:位置编码、查询生成以及键值对生成三个模块。其中,位置编码是可训练的矢量,用于将输入序列中的每个元素映射到一个向量表示,这种表示可以用于后续的处理和分析;查询生成模块和键值对生成模块则依赖于注意力机制进行计算,以捕捉和处理数据中的复杂关联。

Positional Encoding

Positional encoding是一种给输入序列或输出序列增加位置信息的方法。在Transformer模型中,特征提取过程中缺乏位置信息会导致信息丢失。为了解决这一问题,需要引入位置编码机制。该机制通过在不改变输入序列长度的前提下,增加输入序列的位置信息,从而提升模型进行位置预测的能力。位置编码通常采用正弦函数或余弦函数进行编码,具体公式如下所示:其中,PE_{pos}表示位置编码矩阵,pos代表当前的位置(position),d_{model}代表模型的维度(dimensionality of the model)。通常做法是将PE_{pos}矩阵作为最后一层的权重矩阵加到Embedding层之后,然后与Positional Encoding进行相乘。

Multi-head attention

Multi-head attention旨在通过计算多个子空间内的输入向量之间的关联性,将这些结果整合后作为输出。在注意力机制中,每个子空间对应一个Head,从而为模型提供了更复杂的特征表示。通过这种方式,模型的表达能力得到了显著的提升。

Self-attention

Self-attention可以被视为一种自我关注机制,通过在不同层次的输入向量上应用相同的注意力机制进行计算。通过这种方式,可以有效捕捉到全局依赖关系。

Feedforward Network

Feedforward Network基于两个全连接层构建而成,属于深度神经网络。其中,第一层负责接收并处理输入特征,第二层则通过非线性激活函数对输入特征进行转换并输出结果。FFN被设计以学习复杂的高阶表示。

Residual connection

Residual connection represents a mechanism for incorporating skip connections within deep neural networks. By enabling direct connections from the output of preceding layers to the input of subsequent layers without summation operations, skip connections facilitate enhanced convergence properties and more efficient training processes. Through the inclusion of residual components, networks can achieve improved convergence rates and more effective learning dynamics.

Dropout

Dropout是一种防止过拟合的方法。在训练阶段,随机阻止部分神经元参与训练,从而降低过拟合的风险。Dropout率是一个超参数,用于调节dropout的发生概率。

3.核心算法原理和具体操作步骤以及数学公式讲解

模型架构

Transformer模型的结构较为复杂,但总体上可划分为四个主要组件:首先,Encoder模块负责接收输入序列并生成经过编码的输出;其次,通过计算Encoder输出与Decoder输入之间的注意力关系,可以得到Decoder的输出;随后,Decoder模块利用其输出进行解码,最终生成需要的预测结果;最后,Output layer将Decoder的输出映射至词表空间,从而得到具体的预测值。

图1: Transformer模型结构示意图

Embedding层

在输入序列处理过程中,通过Embedding层,每个单词被映射到固定维度的向量空间中。通过训练,Embedding层能够学习出具有强可塑性和意义的词嵌入表示,从而实现语义表示。在训练过程中,Embedding层的参数通过反向传播算法进行动态更新。

图2展示了在Transformer模型架构中,嵌入层的作用机制。该层接收并处理符号索引序列作为输入,并生成相应的符号嵌入矩阵。值得注意的是,尽管不同词的嵌入向量不依赖于上下文信息,但每个词的嵌入表示都是独一无二的,这确保了模型在处理词语时的精确识别能力。

Positional Encoding

位置编码在Encoder和Decoder之间传递位置信息。位置编码的主要目的是帮助Transformer利用位置信息,而非仅关注单词内容。在Transformer模型中,位置编码矩阵(PE矩阵)在Embedding层之后加权,并最终输入到各个子层中。PE矩阵的每一行代表一个位置,每一列代表一个向量维度。

图3: Positional Encoding矩阵的结构示意图。

Encoder

Transformer模型中的编码器组件主要负责将原始输入序列转换为更抽象的形式,从而提取关键信息。如图1所示,编码器中存在两条路径。左侧路径代表正向编码路径,右侧路径代表反向编码路径。这两条路径所呈现的特征分别为:

Masked自注意力机制:在Masked自注意力机制中,每个位置只能获取前序位置的信息,从而限制了信息的流动。具体而言,通过遮蔽其他位置的向量信息,可以实现这一目标。在这一机制下,每个位置仅能观察到自己的上下文信息,而无法获取其他位置的信息。具体过程如下:

对Embedding层的输出施加位置编码,生成位置嵌入向量PE。随后,将经过位置编码的Embedding层输出与PE矩阵进行按列求和,得到位置编码PET。通过将结果与PET矩阵按行相乘来完成计算。接着,使用Softmax函数对每个位置的注意力权重α进行归一化处理。最后,利用α对上一阶段的向量进行加权求和,生成新的向量。将所有位置的更新后的向量进行连接,最终得到编码器输出结果。

多头注意力机制:编码器在每一步仅调用一次自注意力机制,但多头自注意力模块能够同时整合所有路径信息。该模块不仅具备能力去学习不同子空间之间的相互影响,更有效地捕捉信息间的关联性。具体而言,多头自注意力模块首先将输入序列划分为多个子空间,然后通过多头机制分别处理每个子空间的信息,最后通过加权融合形成最终的表示。

对Embedding层的输出施加位置编码,得到位置嵌入向量PE。将Embedding层的输出按不同的子空间划分为查询、键和值向量Q、K、V。将Embedding层的输出与PE矩阵按列相乘,得到位置编码矩阵PET。将Q、K、V分别与PET矩阵进行点积运算,随后进行分割和重新组合,以获得最终的注意力输出。将所有子空间中的注意力结果拼接起来,形成最终的encoder输出。

Decoder

在Transformer模型中,解码器组件主要负责推导出目标序列的概率分布。如图1所示,解码器主要包含编码器输出的处理以及后续的解码器结构。

因为解码器本身就是生成序列,它无法获取其他位置的信息。因此,在计算掩码时,我们仅允许获取当前时间步及其之前所有时间步的信息。

  • 与编码器相似,通过将Embedding层的输出与PE矩阵的按列求和,得到位置编码PE^T。
  • 经过按行与PE^T矩阵相乘运算,计算出注意力权重。
  • 通过Softmax归一化函数,对每个位置的注意力权重进行归一化处理。
  • 通过α将之前的向量按权重进行加权,得到新的向量。
  • 将所有位置的新向量进行串联整合,最终形成decoder的输出结果。
  1. 全连接层:将解码器输出输入到全连接层进行处理,从而生成预测序列。

超参优化

在训练Transformer模型时,可以通过调整以下超参数来优化模型效果:

  1. 注意力机制中的头数:在编码器中,直接影响注意力结果的计算所使用的头数。
  2. Feedforward网络的规模: Feedforward网络的规模直接决定了模型的表达能力。
  3. 学习率和权重衰减参数:学习率和权重衰减参数直接影响模型的训练速度和训练效率。
  4. Dropout比率: Dropout比率直接影响模型的泛化能力。

4.具体代码实例和解释说明

配置环境

在安装pytorch和torchtext等库之前,请确保已安装了支持GPU的Anaconda版本。如果尚未安装,请按照以下步骤进行操作:首先,创建conda虚拟环境时,请执行以下操作:conda create --name torch python=3.7。接着,虚拟环境激活后,请运行:source activate torch。安装pytorch库时,请执行:conda install pytorch torchvision cudatoolkit=10.0 -c pytorch。最后,安装torchtext时,请使用以下命令:pip install torchtext==0.4.0。

数据准备

本文采用torchtext库读取数据集,通过运行以下代码,导入必要的库并加载数据集:

复制代码
    import torch
    from torchtext import data
    from torchtext import datasets
    import random
    
    SEED = 1234
    
    random.seed(SEED)
    torch.manual_seed(SEED)
    torch.backends.cudnn.deterministic = True
    
      
      
      
      
      
      
      
      
      
    
    代码解读

从IMDB数据集中加载该数据集,该数据集是经典的英文情绪分类数据集,包含25000条训练样本和25000条测试样本。

复制代码
    TEXT = data.Field(tokenize='spacy', lower=True, include_lengths=True)
    LABEL = data.LabelField()
    
    train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)
    
    print('Number of training examples:', len(train_data))
    print('Number of testing examples:', len(test_data))
    
      
      
      
      
      
      
    
    代码解读

打印数据的统计信息:

复制代码
    print(vars(train_data[0]))
    
    >> {'text': ['plot', 'is', 'well', 'done', ',', 'with', 'a','surprise', 'ending', '.', "the", 'acting', 'is', 'top', 'notch', ',', 'although', 'there', "'s",'something', 'off', 'about','some', 'of', 'the', 'characters', '.'], 'label': 'pos', 'length': [4, 2, 3, 5, 1, 4, 2, 4, 3, 1, 2, 3, 2, 4, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}
    
      
      
    
    代码解读

数据处理

为了实现将输入序列映射到固定维度向量的目标,需要定义一个词表。通过调用build_vocab()方法,可以实现词表的构建。

复制代码
    MAX_VOCAB_SIZE = 25_000
    
    TEXT.build_vocab(train_data, max_size=MAX_VOCAB_SIZE)
    LABEL.build_vocab(train_data)
    
      
      
      
    
    代码解读

创建模型

该用户成功搭建了一个基础的Transformer架构。该模型架构由多个关键组件构成,包括词嵌入层、位置编码层、编码器模块和解码器模块。

复制代码
    class Transformer(nn.Module):
    def __init__(
        self, 
        embedding_dim, 
        vocab_size, 
        num_heads, 
        pf_dim, 
        dropout, 
        device, 
        max_seq_len=80
    ):
        super().__init__()
    
        self.embedding_dim = embedding_dim
        self.num_heads = num_heads
        self.pf_dim = pf_dim
        self.dropout = dropout
        self.device = device
    
        # Layers
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.pos_encoding = positional_encoding(
            max_seq_len, 
            embedding_dim
        )
    
        self.enc_layers = nn.ModuleList([
            EncoderLayer(
                embedding_dim, 
                num_heads, 
                pf_dim, 
                dropout, 
                device
            ) for _ in range(transformer_layer)
        ])
    
        self.fc = nn.Linear(embedding_dim, vocab_size)
    
    def forward(self, x):
    
        # Create embeddings
        emb = self.embedding(x)
    
        # Add position encoding
        pe = self.pos_encoding[:, :emb.shape[1], :]
        emb += pe
    
        # Pass through encoder layers
        enc_output = emb
        for enc_layer in self.enc_layers:
            enc_output, attentions = enc_layer(enc_output)
    
        # Flatten output to pass through fully connected layer
        fc_input = enc_output.flatten(start_dim=1)
        fc_output = self.fc(fc_input)
    
        return fc_output, attentions
    
    class EncoderLayer(nn.Module):
    def __init__(
        self, 
        embedding_dim, 
        num_heads, 
        pf_dim, 
        dropout, 
        device
    ):
        super().__init__()
    
        self.embedding_dim = embedding_dim
        self.num_heads = num_heads
        self.pf_dim = pf_dim
        self.dropout = dropout
        self.device = device
    
        # Layers
        self.self_attn_layer_norm = LayerNorm(embedding_dim)
        self.ff_layer_norm = LayerNorm(embedding_dim)
    
        self.self_attention = MultiHeadAttention(
            embedding_dim, 
            num_heads, 
            dropout, 
            device
        )
    
        self.positionwise_feedforward = PointWiseFeedForwardNetwork(
            embedding_dim, 
            pf_dim, 
            dropout
        )
    
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        # Extract inputs and outputs
        query, key, value = (
            x, x, x
        )
    
        # Apply self attention
        norm_x = self.self_attn_layer_norm(query + self.dropout((
            self.self_attention(query, key, value))))
    
        # Apply point wise feedforward network
        ptwise_fc_out = self.positionwise_feedforward(norm_x)
    
        # Output
        out = self.dropout(ptwise_fc_out) + norm_x
    
        return out, None
    
    class MultiHeadAttention(nn.Module):
    def __init__(
        self, 
        hid_dim, 
        n_heads, 
        dropout, 
        device
    ):
        super().__init__()
    
        assert hid_dim % n_heads == 0
    
        self.hid_dim = hid_dim
        self.n_heads = n_heads
        self.head_dim = hid_dim // n_heads
    
        self.fc_q = nn.Linear(hid_dim, hid_dim)
        self.fc_k = nn.Linear(hid_dim, hid_dim)
        self.fc_v = nn.Linear(hid_dim, hid_dim)
    
        self.fc_o = nn.Linear(hid_dim, hid_dim)
    
        self.dropout = nn.Dropout(dropout)
    
        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)
    
    def forward(self, query, key, value):
        batch_size = query.shape[0]
    
        Q = self.fc_q(query).view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        K = self.fc_k(key).view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        V = self.fc_v(value).view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
    
        energy = torch.matmul(Q, K.permute(0, 1, 3, 2))/self.scale
    
        attn = torch.softmax(energy, dim=-1)
    
        x = torch.matmul(attn, V)
    
        x = x.permute(0, 2, 1, 3).contiguous()
    
        x = x.view(batch_size, -1, self.hid_dim)
    
        x = self.fc_o(x)
    
        return x
    
    class PointWiseFeedForwardNetwork(nn.Module):
    def __init__(
        self, 
        hid_dim, 
        pf_dim, 
        dropout
    ):
        super().__init__()
    
        self.hid_dim = hid_dim
        self.pf_dim = pf_dim
    
        self.fc_1 = nn.Conv1d(in_channels=hid_dim, out_channels=pf_dim, kernel_size=1)
        self.fc_2 = nn.Conv1d(in_channels=pf_dim, out_channels=hid_dim, kernel_size=1)
    
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
    
        x = x.permute(0, 2, 1)
    
        x = self.dropout(F.relu(self.fc_1(x)))
        x = self.fc_2(x)
    
        x = x.permute(0, 2, 1)
    
        return x
    
    def positional_encoding(
        seq_len, 
        embedding_dim
    ):
    
    pos_encode = np.array([
        [pos / np.power(10000, 2.*i/embedding_dim) for i in range(embedding_dim)]
        if pos!= 0 else np.zeros(embedding_dim) for pos in range(seq_len)])
    
    pos_encode[1:, 0::2] = np.sin(pos_encode[1:, 0::2])
    pos_encode[1:, 1::2] = np.cos(pos_encode[1:, 1::2])
    
    pad_idx = embedding_dim//2
    pos_encode[pad_idx:] = 0
    
    return torch.tensor(pos_encode).float().unsqueeze(0)
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

初始化模型

设置超参数:

复制代码
    INPUT_DIM = len(TEXT.vocab)
    EMBEDDING_DIM = 128
    OUTPUT_DIM = len(LABEL.vocab)
    NUM_HEADS = 8
    PF_DIM = 512
    DROPOUT = 0.1
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
      
      
      
      
      
      
    
    代码解读

创建模型对象:

复制代码
    model = Transformer(
    EMBEDDING_DIM, 
    INPUT_DIM, 
    NUM_HEADS, 
    PF_DIM, 
    DROPOUT, 
    DEVICE, 
    80
    ).to(DEVICE)
    
      
      
      
      
      
      
      
      
    
    代码解读

训练模型

设置超参数:

复制代码
    learning_rate = 5e-5
    optimizer = AdamW(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    epochs = 5
    
      
      
      
    
    代码解读

定义训练和验证函数:

复制代码
    def train(model, iterator, optimizer, criterion, clip):
    model.train()
    
    epoch_loss = 0
    total_accuracy = 0
    total_steps = len(iterator)
    
    for i, batch in enumerate(iterator):
        src = batch.text[0].to(DEVICE)
        trg = batch.label.to(DEVICE)
        batch_size = src.shape[0]
    
        optimizer.zero_grad()
    
        predictions, _ = model(src)
    
        loss = criterion(predictions, trg)
        accuracy = categorical_accuracy(predictions, trg)
    
        loss.backward()
    
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
    
        optimizer.step()
    
        epoch_loss += loss.item()
        total_accuracy += accuracy.item()
    
    return epoch_loss / total_steps, total_accuracy / total_steps
    
    def evaluate(model, iterator, criterion):
    model.eval()
    
    epoch_loss = 0
    total_accuracy = 0
    total_steps = len(iterator)
    
    with torch.no_grad():
        for i, batch in enumerate(iterator):
    
            src = batch.text[0].to(DEVICE)
            trg = batch.label.to(DEVICE)
    
            predictions, _ = model(src)
    
            loss = criterion(predictions, trg)
            accuracy = categorical_accuracy(predictions, trg)
    
            epoch_loss += loss.item()
            total_accuracy += accuracy.item()
    
    return epoch_loss / total_steps, total_accuracy / total_steps
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

设置迭代器:

复制代码
    BATCH_SIZE = 64
    
    train_iterator, valid_iterator = data.BucketIterator.splits(
    (train_data, test_data), 
    batch_size=BATCH_SIZE, 
    sort_within_batch=True, 
    sort_key=lambda x: len(x.text),
    device=DEVICE
    )
    
      
      
      
      
      
      
      
      
    
    代码解读

训练模型:

复制代码
    for epoch in range(epochs):
    start_time = time.time()
    
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion, 1)
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
    
    end_time = time.time()
    
    print(f'Epoch {epoch+1}: | Train Loss: {train_loss:.3f} | Train Accuracy: {train_acc*100:.2f}% | Val. Loss: {valid_loss:.3f} | Val. Accuracy: {valid_acc*100:.2f}% | Time: {(end_time - start_time)/60:.2f} min')
    
      
      
      
      
      
      
      
      
    
    代码解读

测试模型

复制代码
    def predict(sentence):
    tokenized = TEXT.process([sentence]).to(DEVICE)
    prediction, _ = model(tokenized)
    predicted_index = torch.argmax(prediction).item()
    predicted_label = LABEL.vocab.itos[predicted_index]
    probability = torch.softmax(prediction, dim=1)[0][predicted_index].item()
    return {"label": predicted_label, "probability": probability}
    
    predict("The movie was fantastic!")
    
    >> {'label': 'pos', 'probability': 0.9998920631408691}
    
      
      
      
      
      
      
      
      
      
      
    
    代码解读

全部评论 (0)

还没有任何评论哟~