中文命名实体识别NER
又被称为缩略形式NER( English: Named Entity Recognition ),它涉及对文本中的具有特定意义实体信息的识别。此类任务涵盖如人名地点名称机构名称专有名词等文字内容。目前广泛应用于NER领域的先进模型大多依赖于深度学习或统计学习的方法。本文使用的数据集是2018ACL论文中新浪财经收集的简历数据
数据集链接:https://github.com/jiesutd/LatticeLSTM
标注集合遵循BIOES编码规则(其中B代表实体的起始标记,E代表实体的结束标记,I代表实体内部的标记,O代表非实体区域,S代表单一的实体标记),每条标注信息之间留有适当空白以确保清晰可读

对于命名实体识别其他方法举例


常用的模型以及涉及到的主要代码
1、隐马尔可夫模型(HMM)
隐马尔可夫模型通过一个隐藏的马尔可夫链生成不可见的状态序列,并根据各个状态生成观测数据形成观测序列的过程(李航 统计学习方法)。隐马尔可夫模型由初始状态分布、状态转移概率矩阵以及观测概率矩阵共同确定。无需过分担心定义过于复杂,在实际应用中我们只需要掌握NER本质上是一种序列标注问题(预测每个字的BIOES标记),即利用HMM对NER问题进行建模时所能观察到的是由连续词语组成的序列(即观测序列),而无法直接获得的是每个词语对应的标签序列(即状态序列)。具体而言,在这种情况下HMM的主要组成部分可以被理解为:初始状态分布 即为每个标签作为句子起始词的概率;状态转移概率矩阵 则表示从一个标签转移到另一个标签的概率;观测概率矩阵 则是在给定某个标签时生成特定词语的概率。基于上述三个要素构建的HMM模型能够有效识别词语的情感或类型信息
class HMM(object):
def __init__(self, N, M):
"""Args:
N: 状态数,这里对应存在的标注的种类
M: 观测数,这里对应有多少不同的字
"""
self.N = N
self.M = M
# 状态转移概率矩阵 A[i][j]表示从i状态转移到j状态的概率
self.A = torch.zeros(N, N)
# 观测概率矩阵, B[i][j]表示i状态下生成j观测的概率
self.B = torch.zeros(N, M)
# 初始状态概率 Pi[i]表示初始时刻为状态i的概率
self.Pi = torch.zeros(N)
有了模型定义后的工作就是建立模型架构。HMM的学习过程即为隐马尔可夫模型的学习问题(参考李航《统计学习方法》),实际上就是要根据训练数据运用最大似然估计的方法来推断出三元组中的各个参数值——即初始状态分布、状态转移概率矩阵以及观测概率矩阵这三个参数的具体数值。举个例子来帮助理解,在估算初始状态分布时,在给定的数据集中某个标记作为句子起始标记出现过的次数与总句子数的比例即可近似得到其初始概率值:假如某个标记在数据集中作为句子第一个字出现了k次而总共有N个句子,则其出现的概率约为k/N这个结果看起来相当直观对吧?通过这种方法我们可以估算出三元组的所有参数值进而完成HMM的学习任务如下的代码片段展示了这一过程:其中已经出现过的函数被用省略号代替
class HMM(object):
def __init__(self, N, M):
....
def train(self, word_lists, tag_lists, word2id, tag2id):
"""HMM的训练,即根据训练语料对模型参数进行估计,
因为我们有观测序列以及其对应的状态序列,所以我们
可以使用极大似然估计的方法来估计隐马尔可夫模型的参数
参数:
word_lists: 列表,其中每个元素由字组成的列表,如 ['担','任','科','员']
tag_lists: 列表,其中每个元素是由对应的标注组成的列表,如 ['O','O','B-TITLE', 'E-TITLE']
word2id: 将字映射为ID
tag2id: 字典,将标注映射为ID
"""
assert len(tag_lists) == len(word_lists)
# 估计转移概率矩阵
for tag_list in tag_lists:
seq_len = len(tag_list)
for i in range(seq_len - 1):
current_tagid = tag2id[tag_list[i]]
next_tagid = tag2id[tag_list[i+1]]
self.A[current_tagid][next_tagid] += 1
# 一个重要的问题:如果某元素没有出现过,该位置为0,这在后续的计算中是不允许的
# 解决方法:我们将等于0的概率加上很小的数
self.A[self.A == 0.] = 1e-10
self.A = self.A / self.A.sum(dim=1, keepdim=True)
# 估计观测概率矩阵
for tag_list, word_list in zip(tag_lists, word_lists):
assert len(tag_list) == len(word_list)
for tag, word in zip(tag_list, word_list):
tag_id = tag2id[tag]
word_id = word2id[word]
self.B[tag_id][word_id] += 1
self.B[self.B == 0.] = 1e-10
self.B = self.B / self.B.sum(dim=1, keepdim=True)
# 估计初始状态概率
for tag_list in tag_lists:
init_tagid = tag2id[tag_list[0]]
self.Pi[init_tagid] += 1
self.Pi[self.Pi == 0.] = 1e-10
self.Pi = self.Pi / self.Pi.sum()
模型经过全面训练后,在此基础上运用已获得的模型进行解码。即针对那些尚未见过的新奇句子,我们希望求出每个字符在相应语境下的标记,以实现对该未知文本的理解与解析。该任务所采用的方法是维特比(viterbi)算法。关于该算法的具体数学推导过程,建议参考 李航统计学习方法 这一权威教材中的详细阐述
HMM存在两大缺点:其一为观察值之间完全互不相关;其二为每个字符在整个句子中都是自给自足的
2)在状态转移过程中当前的状态仅与上一时刻的状态相关,并未考虑到后续时刻的状态
HMM代码实现的主要模型部分如下:
import torch
class HMM(object):
def __init__(self, N, M):
"""Args:
N: 状态数,这里对应存在的标注的种类
M: 观测数,这里对应有多少不同的字
"""
self.N = N
self.M = M
# 状态转移概率矩阵 A[i][j]表示从i状态转移到j状态的概率
self.A = torch.zeros(N, N)
# 观测概率矩阵, B[i][j]表示i状态下生成j观测的概率
self.B = torch.zeros(N, M)
# 初始状态概率 Pi[i]表示初始时刻为状态i的概率
self.Pi = torch.zeros(N)
def train(self, word_lists, tag_lists, word2id, tag2id):
"""HMM的训练,即根据训练语料对模型参数进行估计,
因为我们有观测序列以及其对应的状态序列,所以我们
可以使用极大似然估计的方法来估计隐马尔可夫模型的参数
参数:
word_lists: 列表,其中每个元素由字组成的列表,如 ['担','任','科','员']
tag_lists: 列表,其中每个元素是由对应的标注组成的列表,如 ['O','O','B-TITLE', 'E-TITLE']
word2id: 将字映射为ID
tag2id: 字典,将标注映射为ID
"""
assert len(tag_lists) == len(word_lists)
# 估计转移概率矩阵
for tag_list in tag_lists:
seq_len = len(tag_list)
for i in range(seq_len - 1):
current_tagid = tag2id[tag_list[i]]
next_tagid = tag2id[tag_list[i+1]]
self.A[current_tagid][next_tagid] += 1
# 问题:如果某元素没有出现过,该位置为0,这在后续的计算中是不允许的
# 解决方法:我们将等于0的概率加上很小的数
self.A[self.A == 0.] = 1e-10
self.A = self.A / self.A.sum(dim=1, keepdim=True)
# 估计观测概率矩阵
for tag_list, word_list in zip(tag_lists, word_lists):
assert len(tag_list) == len(word_list)
for tag, word in zip(tag_list, word_list):
tag_id = tag2id[tag]
word_id = word2id[word]
self.B[tag_id][word_id] += 1
self.B[self.B == 0.] = 1e-10
self.B = self.B / self.B.sum(dim=1, keepdim=True)
# 估计初始状态概率
for tag_list in tag_lists:
init_tagid = tag2id[tag_list[0]]
self.Pi[init_tagid] += 1
self.Pi[self.Pi == 0.] = 1e-10
self.Pi = self.Pi / self.Pi.sum()
def test(self, word_lists, word2id, tag2id):
pred_tag_lists = []
for word_list in word_lists:
pred_tag_list = self.decoding(word_list, word2id, tag2id)
pred_tag_lists.append(pred_tag_list)
return pred_tag_lists
def decoding(self, word_list, word2id, tag2id):
"""
使用维特比算法对给定观测序列求状态序列, 这里就是对字组成的序列,求其对应的标注。
维特比算法实际是用动态规划解隐马尔可夫模型预测问题,即用动态规划求概率最大路径(最优路径)
这时一条路径对应着一个状态序列
"""
# 问题:整条链很长的情况下,十分多的小概率相乘,最后可能造成下溢
# 解决办法:采用对数概率,这样源空间中的很小概率,就被映射到对数空间的大的负数
# 同时相乘操作也变成简单的相加操作
A = torch.log(self.A)
B = torch.log(self.B)
Pi = torch.log(self.Pi)
# 初始化 维比特矩阵viterbi 它的维度为[状态数, 序列长度]
# 其中viterbi[i, j]表示标注序列的第j个标注为i的所有单个序列(i_1, i_2, ..i_j)出现的概率最大值
seq_len = len(word_list)
viterbi = torch.zeros(self.N, seq_len)
# backpointer是跟viterbi一样大小的矩阵
# backpointer[i, j]存储的是 标注序列的第j个标注为i时,第j-1个标注的id
# 等解码的时候,我们用backpointer进行回溯,以求出最优路径
backpointer = torch.zeros(self.N, seq_len).long()
# self.Pi[i] 表示第一个字的标记为i的概率
# Bt[word_id]表示字为word_id的时候,对应各个标记的概率
# self.A.t()[tag_id]表示各个状态转移到tag_id对应的概率
# 所以第一步为
start_wordid = word2id.get(word_list[0], None)
Bt = B.t()
if start_wordid is None:
# 如果字不再字典里,则假设状态的概率分布是均匀的
bt = torch.log(torch.ones(self.N) / self.N)
else:
bt = Bt[start_wordid]
viterbi[:, 0] = Pi + bt
backpointer[:, 0] = -1
# 递推公式:
# viterbi[tag_id, step] = max(viterbi[:, step-1]* self.A.t()[tag_id] * Bt[word])
# 其中word是step时刻对应的字
# 由上述递推公式求后续各步
for step in range(1, seq_len):
wordid = word2id.get(word_list[step], None)
# 处理字不在字典中的情况
# bt是在t时刻字为wordid时,状态的概率分布
if wordid is None:
# 如果字不再字典里,则假设状态的概率分布是均匀的
bt = torch.log(torch.ones(self.N) / self.N)
else:
bt = Bt[wordid] # 否则从观测概率矩阵中取bt
for tag_id in range(len(tag2id)):
max_prob, max_id = torch.max(
viterbi[:, step-1] + A[:, tag_id],
dim=0
)
viterbi[tag_id, step] = max_prob + bt[tag_id]
backpointer[tag_id, step] = max_id
# 终止, t=seq_len 即 viterbi[:, seq_len]中的最大概率,就是最优路径的概率
best_path_prob, best_path_pointer = torch.max(
viterbi[:, seq_len-1], dim=0
)
# 回溯,求最优路径
best_path_pointer = best_path_pointer.item()
best_path = [best_path_pointer]
for back_step in range(seq_len-1, 0, -1):
best_path_pointer = backpointer[best_path_pointer, back_step]
best_path_pointer = best_path_pointer.item()
best_path.append(best_path_pointer)
# 将tag_id组成的序列转化为tag
assert len(best_path) == len(word_list)
id2tag = dict((id_, tag) for tag, id_ in tag2id.items())
tag_list = [id2tag[id_] for id_ in reversed(best_path)]
return tag_list
2、条件随机场
在之前讨论的HMM模型中涉及两个基本假设:第一个假设认为输出观察值相互独立;第二个假设则认为状态转移仅依赖于前一个状态。具体来说,在命名实体识别的情境中,HMM模型将观测到的句子中的每个字视为相互独立的现象,并且当前标记结果不仅受前一个标记结果的影响,在实际应用中发现这种方法存在局限性。例如,在处理中文命名实体识别时,词语本身的词性和语境信息往往能够提供额外的支持信息;此外,在实际场景中发现当前标记结果不仅受前一个标记结果的影响,并且还与其后续标记结果产生关联关系。由于这些基本假设计实上限制了HMM模型对复杂语言现象的理解能力,在实际应用中其表现往往无法达到预期效果
它无需面对这些问题。通过引入自定义特征函数不仅能够表达观测间的相互依赖关系,并且还可以描述当前观测与其前后多个状态之间的复杂依存关系。从而有效地解决了HMM模型所面临的问题。条件随机场数学公式不在此讲述了。同样采用了维特比算法进行解码
from sklearn_crfsuite import CRF # CRF的具体实现太过复杂,这里我们借助一个外部的库
def word2features(sent, i):
"""抽取单个字的特征"""
word = sent[i]
prev_word = "<s>" if i == 0 else sent[i-1]
next_word = "</s>" if i == (len(sent)-1) else sent[i+1]
# 因为每个词相邻的词会影响这个词的标记
# 所以我们使用:
# 前一个词,当前词,后一个词,
# 前一个词+当前词, 当前词+后一个词
# 作为特征
features = {
'w': word,
'w-1': prev_word,
'w+1': next_word,
'w-1:w': prev_word+word,
'w:w+1': word+next_word,
'bias': 1
}
return features
def sent2features(sent):
"""抽取序列特征"""
return [word2features(sent, i) for i in range(len(sent))]
class CRFModel(object):
def __init__(self,
algorithm='lbfgs',
c1=0.1,
c2=0.1,
max_iterations=100,
all_possible_transitions=False
):
self.model = CRF(algorithm=algorithm,
c1=c1,
c2=c2,
max_iterations=max_iterations,
all_possible_transitions=all_possible_transitions)
def train(self, sentences, tag_lists):
"""训练模型"""
features = [sent2features(s) for s in sentences]
self.model.fit(features, tag_lists)
def test(self, sentences):
"""解码,对给定句子预测其标注"""
features = [sent2features(s) for s in sentences]
pred_tag_lists = self.model.predict(features)
return pred_tag_lists

3、Bi_LSTM_CRF
简单来说,LSTM的优势在于它可以通过双向结构捕捉输入字之间的关联。在训练阶段,LSTM可以根据目标(如实体识别)自动生成观测数据的关键特征。然而,其局限性在于无法理解状态序列(输出标记)之间的联系。值得注意的是,在命名实体识别任务中,标签之间通常存在某种内在联系。“例如,B标签表示某个实体开始,B标签通常不会连续出现。”相比之下,CRF的优势在于它能有效地建模隐含的状态转移。因此,通常的做法是在LSTM之后添加一层CRF层以结合两者的优点
下面是给Bi-LSTM加一层CRF的代码实现:
from itertools import zip_longest
from copy import deepcopy
import torch
import torch.nn as nn
import torch.optim as optim
from .util import tensorized, sort_by_lengths, cal_loss, cal_lstm_crf_loss
from .config import TrainingConfig, LSTMConfig
from .bilstm import BiLSTM
class BILSTM_Model(object):
def __init__(self, vocab_size, out_size, crf=True):
"""功能:对LSTM的模型进行训练与测试
参数:
vocab_size:词典大小
out_size:标注种类
crf选择是否添加CRF层"""
self.device = torch.device(
"cuda" if torch.cuda.is_available() else "cpu")
# 加载模型参数
self.emb_size = LSTMConfig.emb_size
self.hidden_size = LSTMConfig.hidden_size
self.crf = crf
# 根据是否添加crf初始化不同的模型 选择不一样的损失计算函数
if not crf:
self.model = BiLSTM(vocab_size, self.emb_size,
self.hidden_size, out_size).to(self.device)
self.cal_loss_func = cal_loss
else:
self.model = BiLSTM_CRF(vocab_size, self.emb_size,
self.hidden_size, out_size).to(self.device)
self.cal_loss_func = cal_lstm_crf_loss
# 加载训练参数:
self.epoches = TrainingConfig.epoches
self.print_step = TrainingConfig.print_step
self.lr = TrainingConfig.lr
self.batch_size = TrainingConfig.batch_size
# 初始化优化器
self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)
# 初始化其他指标
self.step = 0
self._best_val_loss = 1e18
self.best_model = None
def train(self, word_lists, tag_lists,
dev_word_lists, dev_tag_lists,
word2id, tag2id):
# 对数据集按照长度进行排序
word_lists, tag_lists, _ = sort_by_lengths(word_lists, tag_lists)
dev_word_lists, dev_tag_lists, _ = sort_by_lengths(
dev_word_lists, dev_tag_lists)
B = self.batch_size
for e in range(1, self.epoches+1):
self.step = 0
losses = 0.
for ind in range(0, len(word_lists), B):
batch_sents = word_lists[ind:ind+B]
batch_tags = tag_lists[ind:ind+B]
losses += self.train_step(batch_sents,
batch_tags, word2id, tag2id)
if self.step % TrainingConfig.print_step == 0:
total_step = (len(word_lists) // B + 1)
print("Epoch {}, step/total_step: {}/{} {:.2f}% Loss:{:.4f}".format(
e, self.step, total_step,
100. * self.step / total_step,
losses / self.print_step
))
losses = 0.
# 每轮结束测试在验证集上的性能,保存最好的一个
val_loss = self.validate(
dev_word_lists, dev_tag_lists, word2id, tag2id)
print("Epoch {}, Val Loss:{:.4f}".format(e, val_loss))
def train_step(self, batch_sents, batch_tags, word2id, tag2id):
self.model.train()
self.step += 1
# 准备数据
tensorized_sents, lengths = tensorized(batch_sents, word2id)
tensorized_sents = tensorized_sents.to(self.device)
targets, lengths = tensorized(batch_tags, tag2id)
targets = targets.to(self.device)
# forward
scores = self.model(tensorized_sents, lengths)
# 计算损失 更新参数
self.optimizer.zero_grad()
loss = self.cal_loss_func(scores, targets, tag2id).to(self.device)
loss.backward()
self.optimizer.step()
return loss.item()
def validate(self, dev_word_lists, dev_tag_lists, word2id, tag2id):
self.model.eval()
with torch.no_grad():
val_losses = 0.
val_step = 0
for ind in range(0, len(dev_word_lists), self.batch_size):
val_step += 1
# 准备batch数据
batch_sents = dev_word_lists[ind:ind+self.batch_size]
batch_tags = dev_tag_lists[ind:ind+self.batch_size]
tensorized_sents, lengths = tensorized(
batch_sents, word2id)
tensorized_sents = tensorized_sents.to(self.device)
targets, lengths = tensorized(batch_tags, tag2id)
targets = targets.to(self.device)
# forward
scores = self.model(tensorized_sents, lengths)
# 计算损失
loss = self.cal_loss_func(
scores, targets, tag2id).to(self.device)
val_losses += loss.item()
val_loss = val_losses / val_step
if val_loss < self._best_val_loss:
print("保存模型...")
self.best_model = deepcopy(self.model)
self._best_val_loss = val_loss
return val_loss
def test(self, word_lists, tag_lists, word2id, tag2id):
"""返回最佳模型在测试集上的预测结果"""
# 准备数据
word_lists, tag_lists, indices = sort_by_lengths(word_lists, tag_lists)
tensorized_sents, lengths = tensorized(word_lists, word2id)
tensorized_sents = tensorized_sents.to(self.device)
self.best_model.eval()
with torch.no_grad():
batch_tagids = self.best_model.test(
tensorized_sents, lengths, tag2id)
# 将id转化为标注
pred_tag_lists = []
id2tag = dict((id_, tag) for tag, id_ in tag2id.items())
for i, ids in enumerate(batch_tagids):
tag_list = []
if self.crf:
for j in range(lengths[i] - 1): # crf解码过程中,end被舍弃
tag_list.append(id2tag[ids[j].item()])
else:
for j in range(lengths[i]):
tag_list.append(id2tag[ids[j].item()])
pred_tag_lists.append(tag_list)
# indices存有根据长度排序后的索引映射的信息
# 比如若indices = [1, 2, 0] 则说明原先索引为1的元素映射到的新的索引是0,
# 索引为2的元素映射到新的索引是1...
# 下面根据indices将pred_tag_lists和tag_lists转化为原来的顺序
ind_maps = sorted(list(enumerate(indices)), key=lambda e: e[1])
indices, _ = list(zip(*ind_maps))
pred_tag_lists = [pred_tag_lists[i] for i in indices]
tag_lists = [tag_lists[i] for i in indices]
return pred_tag_lists, tag_lists
class BiLSTM_CRF(nn.Module):
def __init__(self, vocab_size, emb_size, hidden_size, out_size):
"""初始化参数:
vocab_size:字典的大小
emb_size:词向量的维数
hidden_size:隐向量的维数
out_size:标注的种类
"""
super(BiLSTM_CRF, self).__init__()
self.bilstm = BiLSTM(vocab_size, emb_size, hidden_size, out_size)
# CRF实际上就是多学习一个转移矩阵 [out_size, out_size] 初始化为均匀分布
self.transition = nn.Parameter(
torch.ones(out_size, out_size) * 1/out_size)
# self.transition.data.zero_()
def forward(self, sents_tensor, lengths):
# [B, L, out_size]
emission = self.bilstm(sents_tensor, lengths)
# 计算CRF scores, 这个scores大小为[B, L, out_size, out_size]
# 也就是每个字对应一个 [out_size, out_size]的矩阵
# 这个矩阵第i行第j列的元素的含义是:上一时刻tag为i,这一时刻tag为j的分数
batch_size, max_len, out_size = emission.size()
crf_scores = emission.unsqueeze(
2).expand(-1, -1, out_size, -1) + self.transition.unsqueeze(0)
return crf_scores
def test(self, test_sents_tensor, lengths, tag2id):
"""使用维特比算法进行解码"""
start_id = tag2id['<start>']
end_id = tag2id['<end>']
pad = tag2id['<pad>']
tagset_size = len(tag2id)
crf_scores = self.forward(test_sents_tensor, lengths)
device = crf_scores.device
# B:batch_size, L:max_len, T:target set size
B, L, T, _ = crf_scores.size()
# viterbi[i, j, k]表示第i个句子,第j个字对应第k个标记的最大分数
viterbi = torch.zeros(B, L, T).to(device)
# backpointer[i, j, k]表示第i个句子,第j个字对应第k个标记时前一个标记的id,用于回溯
backpointer = (torch.zeros(B, L, T).long() * end_id).to(device)
lengths = torch.LongTensor(lengths).to(device)
# 向前递推
for step in range(L):
batch_size_t = (lengths > step).sum().item()
if step == 0:
# 第一个字它的前一个标记只能是start_id
viterbi[:batch_size_t, step,
:] = crf_scores[: batch_size_t, step, start_id, :]
backpointer[: batch_size_t, step, :] = start_id
else:
max_scores, prev_tags = torch.max(
viterbi[:batch_size_t, step-1, :].unsqueeze(2) +
crf_scores[:batch_size_t, step, :, :], # [B, T, T]
dim=1
)
viterbi[:batch_size_t, step, :] = max_scores
backpointer[:batch_size_t, step, :] = prev_tags
# 在回溯的时候我们只需要用到backpointer矩阵
backpointer = backpointer.view(B, -1) # [B, L * T]
tagids = [] # 存放结果
tags_t = None
for step in range(L-1, 0, -1):
batch_size_t = (lengths > step).sum().item()
if step == L-1:
index = torch.ones(batch_size_t).long() * (step * tagset_size)
index = index.to(device)
index += end_id
else:
prev_batch_size_t = len(tags_t)
new_in_batch = torch.LongTensor(
[end_id] * (batch_size_t - prev_batch_size_t)).to(device)
offset = torch.cat(
[tags_t, new_in_batch],
dim=0
) # 这个offset实际上就是前一时刻的
index = torch.ones(batch_size_t).long() * (step * tagset_size)
index = index.to(device)
index += offset.long()
try:
tags_t = backpointer[:batch_size_t].gather(
dim=1, index=index.unsqueeze(1).long())
except RuntimeError:
import pdb
pdb.set_trace()
tags_t = tags_t.squeeze(1)
tagids.append(tags_t.tolist())
# tagids:[L-1](L-1是因为扣去了end_token),大小的liebiao
# 其中列表内的元素是该batch在该时刻的标记
# 下面修正其顺序,并将维度转换为 [B, L]
tagids = list(zip_longest(*reversed(tagids), fillvalue=pad))
tagids = torch.Tensor(tagids).long()
# 返回解码的结果
return tagids
注释:可参考维特比算法的详细讲解如何通俗讲解维特比算法
其他学习连接:
Bi-LSTM-CRF用于序列标注 - 知乎
