【NLP】文本匹配——Simple and Effective Text Matching with Richer Alignment Features(RE2)模型实现
总结
这篇论文介绍了一种名为 RE2 的文本匹配方法,在测试集上的准确率为 0.8391。以下是总结:
方法概述
特征提取
RE2 使用了一维卷积网络(CNN)来提取文本特征:
- 输入为词语序列。
- 卷积核应用于输入序列上。
- 输出经过池化操作(全局最大值池化)并进行预测。
模型组件- 嵌入层:将词语转换为嵌入表示。
- 激活函数:使用Gelu激活函数。
- 卷积层:设计多种卷积核以提高捕捉长距离依赖的能力。
- 残差连接:支持残差连接和增强残差连接两种方式。
- 池化层:全局最大值池化用于降维。
- 预测层:通过全连接网络进行二分类任务。
创新点- 使用CNN替代传统的RNN/LSTM进行特征提取。
- 引入残差连接以改善深度学习模型的稳定性。
- 提供多种预测方式以融合多模态信息。
实现细节
数据处理模块:- 文本切分方式采用字符切分而非词语切分。
- 使用自定义数据集类NLIDataSet加载数据并进行预处理。
- 创建词表并处理mask问题。
模型实现模块:- 改写了常见的嵌入层、激活函数、线性层等类。
- 在卷积操作中引入了多种核大小以增强捕捉能力。
- 提供了三种不同的预测方式以融合多模态信息。
训练封装模块:- 使用PyTorch Lightning框架封装数据加载器和训练过程。
- 在30个epoch后验证集准确率为0.8391。
总结
RE2通过一维卷积网络实现了高效的文本匹配任务,在测试集上的性能表现优异(0.8391)。该方法结合了残差机制和多模态预测策略,在保持简洁的同时提升了准确性。
背景
NLP
NLP
总体而言,这篇介绍不算复杂;但值得注意的是,在复现过程中存在一些细节问题需要特别关注。该方法已提供对应的源码,并可通过以下链接访问:https://github.com/Htring/RE2_TEXT_SIMILARITY_PL。
RE2实现
沿袭以往的实现思路,程序依然分为一下模块:
- 数据处理系统:基于dataloader的高效数据加载方案
- 模型搭建:通过算法构建完整的模型架构
- PyTorch Lightning集成:提供标准化的训练框架
- 模型应用流程:涵盖从训练到部署的完整工作流程
下面就跟着论文中的介绍来实现该模型。
数据处理模块
数据处理模块与以往很相似,这里就不过多介绍了,直接看源码:
import json
import os
from typing import Optional, List, Dict
import pytorch_lightning as pl
import torch
from torch.utils.data import Dataset, DataLoader
import jieba
def jieba_cut(content: str):
return [word for word in jieba.cut(content) if word]
def char_cut(content: str):
return [char for char in list(content) if char]
class NLIDataSet(Dataset):
def __init__(self, data_list, word2index, tag2index, max_length):
self.word2index = word2index
self.tag2index = tag2index
self.max_length = max_length
self.data_list = self._num_data(data_list)
def _num_data(self, data_list):
num_data_list = []
def num_data(sentence):
_num_data = []
for char in sentence:
_num_data.append(self.word2index.get(char))
if len(sentence) > self.max_length:
_num_data = _num_data[: self.max_length]
else:
_num_data = _num_data + [self.word2index.get("<pad>")] * (self.max_length - len(sentence))
return _num_data
for dict_data in data_list:
sentence1, sentence2 = dict_data["sentence1"], dict_data["sentence2"]
sen1_len, sen2_len = len(sentence1), len(sentence2)
# 有一个为空时跳过
if not (sen2_len and sen1_len):
continue
sentence1_num = num_data(sentence1)
sentence2_num = num_data(sentence2)
num_data_list.append([sentence1_num, sentence2_num, self.tag2index.get(dict_data["gold_label"])])
return num_data_list
def __getitem__(self, index):
return self.data_list[index]
def __len__(self):
return len(self.data_list)
class NLIDataModule(pl.LightningDataModule):
def __init__(self, data_dir="corpus/chinese-snli-c", max_length=50, batch_size=3):
super().__init__()
self.data_path = data_dir
self.batch_size = batch_size
self.max_length = max_length
self.train_data_set, self.dev_data_set, self.test_data_set = None, None, None
self.tag2idx, self.token2index = None, None
self.setup()
def _load_data(self, file_path) -> List[Dict]:
data_list = []
with open(file_path, 'r', encoding='utf8') as reader:
for line in reader:
line = line.strip()
if not line:
continue
json_data: dict = json.loads(line)
json_data["sentence1"] = char_cut(json_data["sentence1"])
json_data["sentence2"] = char_cut(json_data["sentence2"])
data_list.append(json_data)
return data_list
def setup(self, stage: Optional[str] = None) -> None:
train_data_list = self._load_data(os.path.join(self.data_path, "train.txt"))
dev_data_list = self._load_data(os.path.join(self.data_path, "dev.txt"))
test_data_list = self._load_data(os.path.join(self.data_path, "test.txt"))
self.char2idx = {"<pad>": 0, "<unk>": 1}
self.tag2idx = {}
for data_list in [train_data_list, dev_data_list, test_data_list]:
for dict_data in data_list:
for words in [dict_data["sentence1"], dict_data["sentence2"]]:
for word in words:
if word not in self.char2idx:
self.char2idx[word] = len(self.char2idx)
if dict_data["gold_label"] not in self.tag2idx:
self.tag2idx[dict_data['gold_label']] = len(self.tag2idx)
self.idx2char = {index: char for char, index in self.char2idx.items()}
self.idx2tag = {index: value for value, index in self.tag2idx.items()}
self.tag_size = len(self.tag2idx)
self.vocab_size = len(self.char2idx)
self.train_data_set = NLIDataSet(train_data_list, self.char2idx, self.tag2idx, self.max_length)
self.dev_data_set = NLIDataSet(dev_data_list, self.char2idx, self.tag2idx, self.max_length)
self.test_data_set = NLIDataSet(test_data_list, self.char2idx, self.tag2idx, self.max_length)
@staticmethod
def collate_fn(batch):
sen1, sen2, y = [], [], []
for simple in batch:
sen1.append(simple[0])
sen2.append(simple[1])
y.append(simple[-1])
sen1_t = torch.tensor(sen1, dtype=torch.long)
sen2_t = torch.tensor(sen2, dtype=torch.long)
y_t = torch.tensor(y, dtype=torch.long)
return sen1_t, sen2_t, y_t
def train_dataloader(self):
return DataLoader(self.train_data_set, batch_size=self.batch_size, collate_fn=self.collate_fn)
def test_dataloader(self):
return DataLoader(self.test_data_set, batch_size=self.batch_size, collate_fn=self.collate_fn)
def val_dataloader(self):
return DataLoader(self.dev_data_set, batch_size=self.batch_size, collate_fn=self.collate_fn)
def save_dict(self, data_dir):
with open(os.path.join(data_dir, "index2tag.txt"), 'w', encoding='utf8') as writer:
json.dump(self.idx2tag, writer, ensure_ascii=False)
with open(os.path.join(data_dir, "token2index.txt"), 'w', encoding='utf8') as writer:
json.dump(self.char2idx, writer, ensure_ascii=False)
AI助手
模型实现模块
鉴于论文中存在多个模块可供选择及处理方案,在源代码中采用了注册机制来获取相关模块。这种做法可被视为一种设计模式值值得借鉴参考。该方法借助了一个装饰器函数来进行操作实现如下:
def register(name=None, registry=None):
"""
将某个函数获这某个类注册到某各地方,装饰器函数
:param name: 注册的函数别名
:param registry: 注册保存的对象
:return: registered fun
"""
def decorator(fn, registration_name=None):
module_name = registration_name or fn.__name__
if module_name in registry:
raise LookupError(f"module {module_name} already registered.")
registry[module_name] = fn
return fn
return lambda fn: decorator(fn, name)
AI助手
较为常见的处理手段是将文本转换为嵌入表示,在现有文献中通常采用基于词嵌入的方法作为输入形式。然而,在现有文献中通常采用词分割作为文本处理的基础,在选择文本切分策略时应充分考虑这些差异。在此阶段我们将其标记为初始状态变量x^{(0)}。此外,在实际应用中我们还可以对嵌入层的结构进行优化和改进。
from collections import OrderedDict
import torch
from torch import nn
import torch.nn.functional as F
class Embedding(nn.Module):
__doc__ = """ 改写的embedding """
def __init__(self, args):
super().__init__()
self.fix_embeddings = args.fix_embeddings
self.embedding = nn.Embedding(args.num_vocab, args.embedding_dim, padding_idx=0)
self.dropout = args.dropout
def set_(self, value):
self.embedding.weight.requires_grad = not self.fix_embeddings
self.embedding.load_state_dict(OrderedDict({'weight': torch.tensor(value)}))
def forward(self, x):
x = self.embedding(x)
x = F.dropout(x, self.dropout, self.training)
return x
AI助手
源码中使用的激活函数也都是Gelu,也对线性函数进行重写,如下:
class GeLU(nn.Module):
__doc__ = """ gelu激活函数 """
def forward(self, x: torch.Tensor) -> torch.Tensor:
return 0.5 * x * (1. + torch.tanh(x * 0.7978845608 * (1. + 0.044715 * x * x)))
class Linear(nn.Module):
__doc__ = """ 改写的Linear层 """
def __init__(self, in_features:int, out_features:int, activations=False):
super().__init__()
linear = nn.Linear(in_features, out_features)
nn.init.normal_(linear.weight, std=math.sqrt((2. if activations else 1.) / in_features))
nn.init.zeros_(linear.bias)
modules = [nn.utils.weight_norm(linear)]
if activations:
modules.append(GeLU())
self.model = nn.Sequential(*modules)
def forward(self, x:torch.Tensor) -> torch.Tensor:
return self.model(x)
AI助手
每一个block的第一个步骤就是通过一个encoder获取该序列的上下文信息。随后将encoder的输入与输出进行连接并传递给对齐层处理。所使用的encoder架构基于多层卷积神经网络设计。对于第n个block来说:
输入:x^{(n)}=(x_1^{(n)},x_2^{(n)},\cdots,x_l^{(n)}),
输出:o^{(n)}=(o_1^{(n)},o_2^{(n)},\cdots,o_l^{(n)})。
其中初始状态o^{(0)}被设定为零向量。
从第二个block开始(即当n≥2时),每个block的输入由以下两部分组成:
x_i^{(n)}=[x_i^{(1)} ; o_i^{(n-1)}+o_i^{(n-2)}]。
为了适应不同大小的卷积核设计并保持卷积后的特征维度一致性,
一维卷积模块在此进行了优化改进,
相关代码实现如下所示:
class Conv1d(nn.Module):
__doc__ = """ 改写的一维卷积 """
def __init__(self, in_channels, out_channels, kernel_sizes: Collection[int]):
super().__init__()
assert all(k % 2 == 1 for k in kernel_sizes), 'only support odd kernel sizes'
assert out_channels % len(kernel_sizes) == 0, 'out channels must be dividable by kernels'
out_channels = out_channels // len(kernel_sizes)
convs = []
for kernel_size in kernel_sizes:
conv = nn.Conv1d(in_channels,
out_channels,
kernel_size,
padding=(kernel_size - 1) // 2)
nn.init.normal_(conv.weight, std=math.sqrt(2. / (in_channels * kernel_size)))
nn.init.zeros_(conv.bias)
convs.append(nn.Sequential(nn.utils.weight_norm(conv), GeLU()))
self.model = nn.ModuleList(convs)
def forward(self, x):
return torch.cat([encoder(x) for encoder in self.model], dim=-1)
AI助手
编码器实现如下:
class Encoder(nn.Module):
__doc__ = """ 编码器 """
def __init__(self, args, input_size):
super().__init__()
self.dropout = args.dropout
self.encoders = nn.ModuleList(
[
Conv1d(in_channels=input_size if i == 0 else args.hidden_size,
out_channels=args.hidden_size,
kernel_sizes=args.kernel_sizes) for i in range(args.enc_layers)
]
)
def forward(self, x: torch.Tensor, mask: torch.Tensor):
x = x.transpose(1, 2) # BxCxL
mask = mask.transpose(1, 2)
for i, encoder in enumerate(self.encoders):
x.masked_fill_(~mask, 0.)
if i > 0:
x = F.dropout(x, self.dropout, self.training)
x = encoder(x)
x = F.dropout(x, self.dropout, self.training)
return x.transpose(1, 2) # BxLxC
AI助手
需要强调的是,在第一个层的输入维度相较于后面的层有所不同。 文中在多个地方采用了dropout技术以防止模型过拟合。
然后采用多样化的串联方式依次连接这些独立且结构相同的block来进行特征提取。其中每个block都是独立且具有相同结构的,在此过程中它们会被串行处理并提取相应的特征信息。不同block之间可以选择不同的连接策略以优化整体模型的表现。
- 残差连接
- 增强残差连接
实现代码如下:
import math
import torch
from torch import nn
from functools import partial
from .utils import register
from . import Linear
registry = {}
register = partial(register, registry=registry)
@register('none')
class NullConnection(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x, _, __):
return x
@register("residual")
class Residual(nn.Module):
def __init__(self, args):
super().__init__()
self.linear = Linear(args.embedding_dim, args.hidden_size)
def forward(self, x: torch.Tensor, res: torch.Tensor, index: int):
if index == 1:
res = self.linear(res)
return (x + res) * math.sqrt(0.5)
@register('aug')
class AugmentedResidual(nn.Module):
def __init__(self, _):
super().__init__()
def forward(self, x: torch.Tensor, res: torch.Tensor, index: int):
if index == 1:
return torch.cat([x, res], dim=-1) # res is embedding
hidden_size = x.size(-1)
x = (res[:, :, : hidden_size] + x) * math.sqrt(0.5)
return torch.cat([x, res[:, :, hidden_size:]], dim=-1) # latter half of res is embedding
AI助手
传统的残差连接需通过线性变换将输入特征映射至与增强型残差连接兼容的空间,并可借助简单的全连接层完成这一过程。值得注意的是,在第一个block之后才建立相邻block之间的链接关系。此外,在构建拼接向量时,默认将其置于末尾位置,并且在该过程中默认会进行均方平均操作以减少信息损失程度。
编码结束之后就是一个重要环节,语义特征对齐。
alignment layer 对齐层
对齐层负责处理两个sentence,并采用类似于注意力机制的方式计算其特征关联。随后由融合层将 alignment 的输入和输出融合到一起。值得注意的是,在评估两段语句的相关性时有两种主要方法可选:一种是Identity函数, 另一种是单层前馈网络(Fully Connected Network, FCN)。其中, Identity函数的工作原理是通过向量间的点积来进行比较;而单层前馈网络则可以通过构建一个全连接网络来实现相似度的衡量功能。对于对齐过程而言, 其核心目标是实现两个sentence间的精确匹配, 对于每一个待对齐的 sentence 输入来说, 需要满足特定的需求条件
import math
import torch
from torch import nn
import torch.nn.functional as F
from functools import partial
from .utils import register
from . import Linear
registry = {}
# 将register中registry参数值固定为registry
register = partial(register, registry=registry)
@register("identity")
class Alignment(nn.Module):
def __init__(self, args, _):
super().__init__()
self.temperature = nn.Parameter(torch.tensor(1 / math.sqrt(args.hidden_size)))
def _attention(self, a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
return torch.matmul(a, b.transpose(1, 2)) * self.temperature
def forward(self, a: torch.Tensor, b: torch.Tensor, mask_a: torch.Tensor, mask_b: torch.Tensor):
attention = self._attention(a, b)
mask = torch.matmul(mask_a.float(), mask_b.transpose(1, 2).float())
mask = mask.bool()
attention.masked_fill_(~mask, -1e4)
attention_a = F.softmax(attention, dim=1)
attention_b = F.softmax(attention, dim=2)
feature_a = torch.matmul(attention_b, b)
feature_b = torch.matmul(attention_a, a)
return feature_a, feature_b
@register("linear")
class MappedAlignment(Alignment):
def __init__(self, args, input_size):
super().__init__(args, input_size)
self.projection = nn.Sequential(
nn.Dropout(args.dropout),
Linear(input_size, args.hidden_size, activations=True)
)
def _attention(self, a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
a = self.projection(a)
b = self.projection(b)
return super()._attention(a, b)
AI助手
其中在计算attention权重时还进行了缩放操作。
通过操作获得对齐后的特征向量后,则必须将未对齐的原始向量与经过处理后的结果进行结合。
fusion layer 融合层
其输出结果本质上相当于一个单独的block结构,在融合机制中扮演着关键角色。这种输出结果可能直接传递给后继的block单元作为输入信号,或者被系统设计为池化层(Pooling Layer)所接收的具体特征向量输入。
import torch
from torch import nn
from functools import partial
from .utils import register
from . import Linear
import torch.nn.functional as F
registry = {}
register = partial(register, registry=registry)
@register('simple')
class Fusion(nn.Module):
def __init__(self, args, input_size):
super().__init__()
self.fusion = Linear(input_size * 2, args.hidden_size, activations=True)
def forward(self, x, align):
return self.fusion(torch.cat([x, align], dim=-1))
@register('full')
class FulFusion(nn.Module):
def __init__(self, args, input_size):
super().__init__()
self.dropout = args.dropout
self.fusion1 = Linear(input_size*2, args.hidden_size, activations=True)
self.fusion2 = Linear(input_size*2, args.hidden_size, activations=True)
self.fusion3 = Linear(input_size*2, args.hidden_size, activations=True)
self.fusion = Linear(args.hidden_size * 3, args.hidden_size, activations=True)
def forward(self, x: torch.Tensor, align: torch.Tensor):
g1 = self.fusion1(torch.cat([x, align], dim=-1))
g2 = self.fusion2(torch.cat([x, x - align], dim=-1))
g3 = self.fusion3(torch.cat([x, x * align], dim=-1))
g = F.dropout(torch.cat([g1, g2, g3], dim=-1), self.dropout, self.training)
return self.fusion(g)
AI助手
在融合过程中必须注意各个block在融合接收过程中的输入与其对应的数据维度是否存在差异
pooling layer 池化层
池化操作则会将输入经过处理后生成定长向量,并传递给预测层。这一过程较为直接,请看具体步骤。
from torch import nn
import torch
class Pooling(nn.Module):
def forward(self, x:torch.Tensor, mask: torch.Tensor):
return x.masked_fill_(~mask, -float('inf')).max(dim=1)[0]
AI助手
在经过特征向量池化处理后,可以直接将两个语句的特征向量输入到预测层进行判断;当语句之间的特征完成提取工作后,则开始对这两个语句进行预测。
prediction layer 预测层
通过处理各层计算所得的特征向量,在构建一个预测层后即可完成一个分类任务。请注意,在上述方法中我们采用了多种向量拼接方式进行预测,请参考下面提供的代码实现。
import torch
from torch import nn
from functools import partial
from . import Linear
from .utils import register
registry = {}
register = partial(register, registry=registry)
@register('simple')
class Prediction(nn.Module):
def __init__(self, args, input_features=2):
super().__init__()
self.dense = nn.Sequential(
nn.Dropout(args.dropout),
Linear(args.hidden_size * input_features, args.hidden_size, activations=True),
nn.Dropout(args.dropout),
Linear(args.hidden_size, args.num_classes),
)
def forward(self, a: torch.Tensor, b: torch.Tensor):
return self.dense(torch.cat([a, b], dim=-1))
@register('full')
class AdvancedPrediction(Prediction):
def __init__(self, args):
super().__init__(args, input_features=4)
def forward(self, a: torch.Tensor, b: torch.Tensor):
return self.dense(torch.cat([a, b, a - b, a * b], dim=-1))
@register('symmetric')
class SymmetricPrediction(AdvancedPrediction):
def forward(self, a: torch.Tensor, b: torch.Tensor):
return self.dense(torch.cat([a, b, (a - b).abs(), a * b], dim=-1))
AI助手
pytorch_lightning 训练封装模块
以往的程序已经做过很多介绍,这里就不在赘述了,查看源码即可。
模型训练和使用模块
在模型训练过程中,默认采用了不采用预训练字符向量这一策略,并经过30个epoch的持续训练后,在测试集上取得了显著的效果。
Testing: 100%|██████████| 42/42 [00:17<00:00, 2.49it/s]
precision recall f1-score support
0 0.77 0.95 0.85 6250
1 0.93 0.72 0.81 6250
accuracy 0.83 12500
macro avg 0.85 0.83 0.83 12500
weighted avg 0.85 0.83 0.83 12500
--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'accuracy': 0.8340799808502197,
'f1_score': 0.8340800404548645,
'recall': 0.8340799808502197,
'val_loss': 0.5524728894233704}
--------------------------------------------------------------------------------
Testing: 100%|██████████| 42/42 [00:18<00:00, 2.28it/s]
AI助手
尽管acc在0.834至0.839之间存在微小差距,但整体上实现了复现。此外,原论文中缺乏针对该语料的比较基准。
总结
本文整体架构较为合理,并且其模型架构与ESIM存在一定程度的相似性。其中,在文献中提到,在本研究中采用的是卷积神经网络(CNN)技术进行特征提取。值得注意的是,在本研究之外,在深入阅读该研究的过程中会发现理论部分相对完善,并且在实现过程中所涉及的一些技巧细节同样值得关注。这篇综述性文章发表于2019年,并且值得注意的是,在文献中提到,在本研究中采用的是卷积神经网络(CNN)技术同时,在文献中提到,在本研究中采用的是卷积神经网络(CNN)技术
