VIDHOP, viral host prediction with Deep Learning 论文阅读笔记
VIDHOP, viral host prediction with Deep Learning 论文阅读笔记
github : https://github.com/flomock/vidhop
摘要
Zoonosis是人畜共患病,在人与人或人与动物之间也可相互传染致病。例如寨卡病毒、埃博拉病毒及COVID-19等多种病毒都属于此类疾病。为了了解解决全球化的传播带来的挑战,在本研究中我们提出了一种基于基因组序列分析的方法用于预测宿主类型(其中输入参数为病毒的碱基序列数据集X对应宿主类型Y)。我们设计了一种新的评估标准——平均分类精度指标(APR),用于衡量该方法在宿主分类任务中的性能。此外通过动态权重调整机制优化后的结果表明:本模型具有良好的跨物种适应能力其分类性能波动较为明显但在应用到其他病毒感染问题时其分类准确性表现更为稳定
原理
模型总览

根据图中的信息可知,首先从国家指定的某些生物网站获取所需数据.其中,本文通过欧洲核苷酸档案 (ENA) 数据库 整理并收集了相关宿主标记的甲型流感病毒、狂犬病病毒以及轮状病毒A的所有核苷酸序列.此外,在处理过程中使用德国生物技术信息中心 (NCBI) 提供的数据来进行宿主标签的管理.随后,在对这些数据进行预处理后输入模型以获得预测结果并进行详细分析.在后续部分中会对该模型的工作流程进行全面解析.
数据预处理
在数据预处理过程中需要注意的是每个病毒基因序列具有不同的长度这一特征,在实际操作中需要特别关注这一点以确保后续分析的有效性

为了实现对数据集的有效管理与整合目标,在图表中所展示的方法基础上对原始数据进行剪裁处理。具体而言,在基因从小到大排序后确定95%分位点作为剪裁标准,并以此来计算剪裁的长度。通过这种方法的应用,在实际操作中就能够有效地将各项指标统一协调起来。
存在的问题是,在最短基因序列长度不足的情况下,在模型中使用处理后的数据会导致其难以有效学习(underfitting),因此本文针对这一挑战设计了几种数据增强策略
- 简单复制:用于将原基因本体进行多次拼接以实现所需长度
- 简单复制与补充占位符:在此方法基础上,在任意位置加入2至8个占位符号以补充缺失部分
- 随机复制:通过从原始基因中随机选取一部分并将其连接到自身以完成复制过程
- 随机复制与补充占位符:在此方法基础上,在适当位置加入一定数量的占位符号以增强多样性
- 补充占位符:在原始基因尾部增添一定数量的占位符号直至达到预期长度要求
- 剪裁操作:对整个基因组进行截断处理使其适应最短基因的标准长度需求
处理完的数据集输入到模型中以加快训练速度为目标将其拆分为小块分批处理最后综合各批次输出得出最终预测结论采用one-hot编码将碱基编码为对应的数值形式
本文提出了一种名为在线学习(Online)的方法。该方法无需在输入模型训练前对数据进行预处理。而是在训练过程中施加一定影响。具体操作方法见实际代码,在文中有此提及但未作详细说明。
在模型 training 的过程中,在合理 partitioning 出一定比例的数据用于 validation 和 test 后,在每个 epoch 期间从剩余的数据中抽取不同且少于剩余样本总数的数量来进行 training。这种方法被称为 随机重复欠采样(Random Under-Sampling with Repeated Operations) ,其作用是通过减少过represented 类别中的 sample 数量来平衡各类别的 distribution。
DNN模型结构
本文重点提出并介绍了两种深度神经网络模型:一种是仅由长短期记忆单元(LSTM)构成的纯模型;另一种则是结合卷积神经网络(CNN)与长短期记忆单元(LSTM)构建而成。其主要区别在于前者由三层 LSTM 层后跟两层全连接层构成;而后者则由双层全连接网络、双层 LSTM 结构以及双层全连接尾部组成,并以一层单向密集连接结构完成最终分类任务。对于面临类别数量庞大的场景,则需要前面所述的数据切片方法以提高处理效率和准确性
预测结果处理
采用分段处理的方式对长基因进行切片输入,在模型中分别处理每个生成片段及其对应的预测结果,并通过综合分析所有片段的结果来反映整个基因的特征与行为模式。本文则提出了一种有效的方法
- 未进行任何处理的情况下直接输出每个基因切片的预测结果
- 票选法即通过对其所属类别进行多数"投票"来确定预测结果的方法适用于所有子序列这种方法得出的结果偏差较大不够连续
- 均值法则是指将各个子序列的概率按类别求平均值得出均值较高的那一个类别作为最终分类依据
- 基于上述方法的基础上我们采用各子序列与其所属类别的标准差作为加权系数具有更多不同预测类别的样本会获得更高的权重
- 最终分类结果只有经过上述各种方法综合处理后才能够得到
实验结果
概况
本研究采用了两种不同的模型对三个独立的数据集进行了系统性分析。为了确保训练效果评估的准确性,在最大迭代次数达到1500个epoch后停止训练过程。观察发现,在验证准确率在前300个epoch未见提升时(即当acc值仍未明显上升),模型则终止进一步优化运算。对于轮状病毒A数据集而言,在包含4万种病毒基因序列的基础上分类识别六种宿主类别(即对应于六个宿主类别),由于这六个宿主类别之间具有较强的关联性(即由于这六个宿主类别之间具有较强的关联性),因此实验测得其对应的预期随机准确度计算得出约为16.67%(即六个宿主类别对应的预期随机准确度计算得出约为16.67%)。基于此基础进行预测分析所得结果表明,在这种情况下两个模型的实际预测准确率均较高(分别为85.28%和82.88%)。而在甲流病毒数据集中包含36种病毒基因序列的情况下(即当宿主机数量较多时),通过上述方法得到的结果表明,在这种情况下实际测得的预测准确率则大幅下降至约50%左右(即在这种情况下实际测得的预测准确率则大幅下降至约50%左右)。由此可推断出:随着宿主机数量增多及类型差异加大而导致随机基准值降低这一现象变得愈发明显
实验结果如下表所示

可以看出,在多种预处理数据扩展方案中,默认情况下最优的方法是采用简单的复制补充占位位置的方法,并辅以随机复制补充占位位置的方法;其他方法的效果不如前者显著
基于类的数量进行考量后, 本文提出了一个新的分类准确率计算方法:
\text{average accuracy} =\frac{2 \cdot \text { accuracy }+\mid \text { classes } \mid-2}{\mid \text { classes } \mid}
应用该公式则能够有效提升分类准确率, 如下所示

与其他模型的对比请看原文
总结
该研究开发了一种新型方法将病毒碱基序列转换为数据用于训练预测宿主类别的模型,并采用了多个不同的数据集进行了实验研究。研究发现当分类的类别数量增加时准确预测宿主类别的难度显著提升。所采用的模型架构较为基础并未采用当前最前沿的技术。本研究的主要贡献包括两部分:第一部分详细讲解了对基因序列剪裁后如何进一步处理使其成为可分析的数据;第二部分则提出了利用NLP方法分析基因序列以训练预测特定基因对应的宿主类型的新思路在当下新冠大流行背景下这一探索具有较高的研究价值。
2021.07.12更新
实验流程及对应代码解读
代码结构
vidhop
|-- DataParsing
| `-- DataParsing_main.py
|-- cli.py
|-- training
| |-- DataGenerator.py
| |-- make_dataset_out
| | |-- X_test.csv
| | |-- X_train.csv
| | |-- X_val.csv
| | |-- Y_test.csv
| | |-- Y_train.csv
| | `-- Y_val.csv
| |-- make_datasets.py
| `-- train_new_model.py
|-- vidhop_main.py
`-- weights
|-- influ_weights.best.acc.normal_repeat_spacer_run2.hdf5
|-- rabies_weights.best.acc.random_repeat_run2_design_7.hdf5
`-- rota_weights.best.acc.online_design_7.hdf5
整个项目的结构如上图所示,其中:
- DataParsing模块负责对数据进行预处理。
- 数据生成器DataGenerator.py生成训练所需的数据。
- make_dataset.py负责从数据集中提取并划分训练集、验证集和测试集的数据,并将这些数据保存为CSV文件。
- 训练脚本train_new_model.py包含完整的模型训练流程及相关的数据处理逻辑。
- 主程序vidhop_main.py负责加载模型并进行测试。
- 权重文件夹weights中存储的是该模型的预训练权重参数。
数据预处理——DataParsing.py
class CircularList(list): # 获取list
def __getitem__(self, x):
if isinstance(x, slice):
return [self[x] for x in self._rangeify(x)]
index = operator.index(x)
try:
return super().__getitem__(index % len(self))
except ZeroDivisionError:
raise IndexError('list index out of range')
def _rangeify(self, slice):
start, stop, step = slice.start, slice.stop, slice.step
if start is None:
start = 0
if stop is None:
stop = len(self)
if step is None:
step = 1
return range(start, stop, step)
获取待处理的list
def encode_string(maxLen=None, x=[], y=[], y_encoder=None, repeat=True, use_spacer=False, online_Xtrain_set=False,
randomrepeat=False):
"""
One hot encoding for classes
to convert the "old" exported int data via OHE to binary matrix
http://machinelearningmastery.com/multi-class-classification-tutorial-keras-deep-learning-library/
for dna ony to int values
"""
def pad_n_repeat_sequences(sequences, maxlen=None, dtype='int32',
padding='post', truncating='post', value=0.):
"""extended version of pad_sequences()"""
if not hasattr(sequences, '__len__'):
raise ValueError('`sequences` must be iterable.')
lengths = []
for x in sequences:
if not hasattr(x, '__len__'):
raise ValueError('`sequences` must be a list of iterables. '
'Found non-iterable: ' + str(x))
lengths.append(len(x))
num_samples = len(sequences)
if maxlen is None:
maxlen = np.max(lengths) # sequences是基因序列,x是每一个序列的长度,求出最大值maxLen得到填充值
# take the sample shape from the first non empty sequence
# checking for consistency in the main loop below.
sample_shape = tuple()
for s in sequences:
if len(s) > 0:
sample_shape = np.asarray(s).shape[1:]
break
# make new array and fill with input seqs
x = (np.ones((num_samples, maxlen) + sample_shape) * value).astype(dtype) # np.ones((2, 1) + (1, 2)) 的 shape(2, 1, 1, 2)?? maybe 3维
for idx, s in enumerate(sequences):
if not len(s):
continue # empty list/array was found
if truncating == 'pre': # 加在序列的前面
trunc = s[-maxlen:]
elif truncating == 'post': # 加在序列的后面
trunc = s[:maxlen]
else:
raise ValueError('Truncating type "%s" not understood' % truncating)
# check `trunc` has expected shape
trunc = np.asarray(trunc, dtype=dtype)
if trunc.shape[1:] != sample_shape:
raise ValueError(
'Shape of sample %s of sequence at position %s is different from expected shape %s' %
(trunc.shape[1:], idx, sample_shape))
if repeat:
# repeat seq multiple times
repeat_seq = np.array([], dtype=dtype)
while len(repeat_seq) < maxLen:
if use_spacer:
spacer_length = random.randint(1, 50)
spacer = [value for i in range(spacer_length)]
repeat_seq = np.append(repeat_seq, spacer)
if randomrepeat:
random_start = random.randint(0, len(trunc))
repeat_seq = np.append(repeat_seq,
CircularList(trunc)[random_start:random_start + len(trunc)])
## 序列+间隔+序列
# 随机位置插入一段序列
else:
repeat_seq = np.append(repeat_seq, trunc)
else:
if randomrepeat:
random_start = random.randint(0, len(trunc))
repeat_seq = np.append(repeat_seq,
CircularList(trunc)[random_start:random_start + len(trunc)])
else:
repeat_seq = np.append(repeat_seq, trunc)
x[idx, :] = repeat_seq[-maxLen:]
else:
if padding == 'post':
x[idx, :len(trunc)] = trunc
elif padding == 'pre':
x[idx, -len(trunc):] = trunc
else:
raise ValueError('Padding type "%s" not understood' % padding)
return x
# ↑ 数据处理部分
encoder = LabelEncoder()
if len(x) > 0:
a = "ATGCN-"
encoder.fit(list(a)) # fit将a中的6个元素编码成0-5的数字
out = []
if type(x)==str:
dnaSeq = re.sub(r"[^ACGTUacgtu]", 'N', x)
encoded_X = encoder.transform(list(dnaSeq)) # transform将原始序列变成编码序列
out.append(encoded_X)
else:
for i in x:
dnaSeq = re.sub(r"[^ACGTUacgtu]", 'N', i)
# dnaSeq = i[0]
encoded_X = encoder.transform(list(dnaSeq))
out.append(encoded_X)
if online_Xtrain_set:
X_train_categorial = []
for seq in out:
X_train_categorial.append(np.array(to_categorical(seq, num_classes=len(a)), dtype=np.bool))
return X_train_categorial
else:
out = pad_n_repeat_sequences(out, maxlen=maxLen, dtype='int16', truncating='pre', value=0)
return np.array(to_categorical(out, num_classes=len(a)), dtype=np.bool)
else:
if y_encoder != None:
encoder.fit(y)
if np.array(encoder.classes_ != y_encoder.classes_).all():
warning(f"Warning not same classes in training and test set")
useable_classes = set(encoder.classes_).intersection(y_encoder.classes_) # 将X和Y放在一起
try:
assert np.array(encoder.classes_ == y_encoder.classes_).all()
except AssertionError:
warning(
f"not all test classes in training data, only {useable_classes} predictable "
f"from {len(encoder.classes_)} different classes\ntest set will be filtered so only predictable"
f" classes are included")
try:
assert len(useable_classes) == len(encoder.classes_) # 判断X和Y的类别长度是否相等
except AssertionError:
print(f"not all test classes in training data, only " \
f"{useable_classes} predictable from {len(encoder.classes_)} different classes" \
f"\ntest set will be filtered so only predictable classes are included")
if not len(useable_classes) == len(encoder.classes_):
global X_test, Y_test
arr = np.zeros(X_test.shape[0], dtype=int)
for i in useable_classes:
arr[y == i] = 1
X_test = X_test[arr == 1, :]
y = y[arr == 1]
encoded_Y = y_encoder.transform(y)
else:
encoded_Y = encoder.transform(y)
return to_categorical(encoded_Y, num_classes=len(y_encoder.classes_))
else:
encoder.fit(y)
# print(encoder.classes_)
# print(encoder.transform(encoder.classes_))
encoded_Y = encoder.transform(y)
return to_categorical(encoded_Y), encoder
首先确定数据集中最长基因序列的长度,并基于选项参数判断是否执行重复操作、是否使用spacer以及随机选择填充位置等步骤来完成基因序列的填充工作。完成基因序列填充后,在完成基因序列填充后,在完成上述操作后,在完成上述步骤后,在完成上述所有操作之后,在完成所有后续流程之前,在这一阶段结束后,在这一轮操作结束后,在当前阶段结束后,在当前阶段结束前 在这一轮操作中 在这一轮中 在这一阶段中 在这一轮任务中
def calc_shrink_size(seqlength):
subSeqlength = 100
for i in range(100, 400):
if (seqlength % i == 0):
subSeqlength = i
batch_size = int(seqlength / subSeqlength)
return subSeqlength, batch_size
def shrink_timesteps(X, Y, input_subSeqlength=0):
"""
needed for Truncated Backpropagation Through Time
If you have long input sequences, such as thousands of timesteps,
you may need to break the long input sequences into multiple contiguous subsequences.
e.g. 100 subseq.
Care would be needed to preserve state across each 100 subsequences and reset
the internal state after each 100 samples either explicitly or by using a batch size of 100.
:param input_subSeqlength: set for specific subsequence length
:return:
"""
# assert input_subSeqlength != 0, "must provide variable \"input_subSeqlength\" when using shrink_timesteps for specific subset"
if len(X.shape) == 3:
seqlength = X.shape[1]
features = X.shape[-1]
if input_subSeqlength == 0:
subSeqlength, batch_size = calc_shrink_size(seqlength)
else:
subSeqlength = input_subSeqlength
batch_size = int(seqlength / subSeqlength)
newSeqlength = int(seqlength / subSeqlength) * subSeqlength
bigarray = []
for sample in X:
sample = np.array(sample[0:newSeqlength], dtype=np.bool)
subarray = sample.reshape((int(seqlength / subSeqlength), subSeqlength, features))
bigarray.append(subarray)
bigarray = np.array(bigarray) # 把一个batch的数据拼接在一起
X = bigarray.reshape((bigarray.shape[0] * bigarray.shape[1], bigarray.shape[2], bigarray.shape[3]))
elif len(X.shape) == 2:
seqlength = X.shape[0]
features = X.shape[-1]
if input_subSeqlength == 0:
subSeqlength, batch_size = calc_shrink_size(seqlength)
else:
subSeqlength = input_subSeqlength
batch_size = int(seqlength / subSeqlength)
newSeqlength = int(seqlength / subSeqlength) * subSeqlength
sample = np.array(X[0:newSeqlength], dtype=np.bool)
subarray = sample.reshape((int(seqlength / subSeqlength), subSeqlength, features))
X = np.array(subarray)
else:
assert len(X.shape) == 2 or len(
X.shape) == 3, f"wrong shape of input X, expect len(shape) to be 2 or 3 but is instead {len(X.shape)}"
y = []
for i in Y:
y.append(int(seqlength / subSeqlength) * [i])
Y = np.array(y)
if len(Y.shape) == 2:
Y = np.array(y).flatten()
elif len(Y.shape) == 3:
Y = Y.reshape((Y.shape[0] * Y.shape[1], Y.shape[2]))
return X, Y, batch_size
当基因序列长度超过一定阈值时,在处理一批数据时,则需将各片段依次连接起来形成完整的输入数据
数据生成器——DataGenerator.py
第一步利用make_dataset.py程序将输入数据按照预设的比例划分成X_train/val/test.csv以及Y_train/val/test.csv两组文件集。由于make_dataset.py程序较为简单无需对其运行机制进行深入研究因此可以直接调用并执行其功能以满足后续的数据处理需求。
def __data_generation(self, list_IDs_temp, indexes):
pool = multiprocessing.pool.ThreadPool()
'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
# Initialization
# X = np.empty((self.batch_size, self.dim, self.n_channels),dtype='str')
X = np.empty((self.number_samples_per_batch), dtype=object)
Y = np.empty((self.number_samples_per_batch), dtype=int)
sample_weight = np.array([])
def load_csv(sample):
X_i = pd.read_csv(os.path.join(self.directory, sample), delimiter='\t', dtype='str', header=None)[1].values[0]
return X_i
# Generate data
samples = pool.map(load_csv,list_IDs_temp)
X = np.array(samples)
for i, ID in enumerate(list_IDs_temp):
# Store sample
# load tsv, parse to numpy array, get str and set as value in X[i]
# X[i] = pd.read_csv(os.path.join(self.directory, ID), delimiter='\t', dtype='str', header=None)[1].values[0]
# sample_weight = np.append(sample_weight, 1)
# if len(X[i]) < self.dim:
# X[i] = "-" * self.dim
# sample_weight[i] = 0
# Store class
Y[i] = self.labels[indexes[i]]
sample_weight = np.array([[i] * self.number_subsequences for i in sample_weight]).flatten()
if self.maxLen == None:
maxLen = self.number_subsequences * self.dim
else:
maxLen = self.maxLen
# original_length = 50
# start_float = (original_length - self.sequence_length) / 2
# start = math.floor(start_float)
# stop = original_length - math.ceil(start_float)
# # amino = "GALMFWKQESPVICYHRNDTU"
# amino = "GALMFWKQESPVICYHRNDTUOBZX"
# encoder = LabelEncoder()
# encoder.fit(list(amino))
# X = parse_amino(x=[[i[start:stop]] for i in X], encoder=encoder)
# X = self.elmo_embedder.elmo_embedding(X, start, stop)
#
# X = seqvec.embed_sentence([i[start:stop] for i in X])
def encode_sample(sample):
X_i = DataParsing_main.encode_string(maxLen=maxLen, x=str(sample), repeat=self.repeat, use_spacer=self.use_spacer)
return X_i
X_wrong_shape = np.array(pool.map(encode_sample,X))
X = np.array(X_wrong_shape).reshape((X_wrong_shape.shape[0],-1,6))
# X = DataParsing.encode_string(maxLen=maxLen, x=X, repeat=self.repeat, use_spacer=self.use_spacer)
# assert self.shrink_timesteps != True or self.online_training != True, "online_training shrinks automatically " \
# "the files, please deactivate shrink_timesteps"
if self.online_training:
X, Y = DataParsing_main.manipulate_training_data(X=X, Y=Y, subSeqLength=self.dim,
number_subsequences=self.number_subsequences)
elif self.shrink_timesteps:
X, Y, batchsize = DataParsing_main.shrink_timesteps(input_subSeqlength=self.dim, X=X, Y=Y)
pool.close()
pool.join()
return X, tf.keras.utils.to_categorical(Y, num_classes=self.n_classes), sample_weight
导入CSV文件以获取数据及其对应的类序号(其中每个host对应一个独特的分类标签),提取所有基因序列中的最大长度,并随后应用数据分析解析技术对这些信息进行整理与分析
def _count_valid_files_in_directory(directory, white_list_formats, split,
follow_links):
"""
Copy from keras 2.1.5
Count files with extension in `white_list_formats` contained in directory.
Arguments:
directory: absolute path to the directory
containing files to be counted
white_list_formats: set of strings containing allowed extensions for
the files to be counted.
split: tuple of floats (e.g. `(0.2, 0.6)`) to only take into
account a certain fraction of files in each directory.
E.g.: `segment=(0.6, 1.0)` would only account for last 40 percent
of images in each directory.
follow_links: boolean.
Returns:
the count of files with extension in `white_list_formats` contained in
the directory.
"""
num_files = len(
list(_iter_valid_files(directory, white_list_formats, follow_links)))
if split:
start, stop = int(split[0] * num_files), int(split[1] * num_files)
else:
start, stop = 0, num_files
return stop - start
def parse_amino(x, encoder):
out = []
for i in x:
# dnaSeq = i[1].upper()
dnaSeq = i[0].upper()
encoded_X = encoder.transform(list(dnaSeq))
out.append(encoded_X)
return np.array(out)
def _list_valid_filenames_in_directory(directory, white_list_formats, split,
class_indices, follow_links):
"""Lists paths of files in `subdir` with extensions in `white_list_formats`.
Copy from keras-preprocessing 1.0.9
# Arguments
directory: absolute path to a directory containing the files to list.
The directory name is used as class label
and must be a key of `class_indices`.
white_list_formats: set of strings containing allowed extensions for
the files to be counted.
split: tuple of floats (e.g. `(0.2, 0.6)`) to only take into
account a certain fraction of files in each directory.
E.g.: `segment=(0.6, 1.0)` would only account for last 40 percent
of images in each directory.
class_indices: dictionary mapping a class name to its index.
follow_links: boolean.
# Returns
classes: a list of class indices
filenames: the path of valid files in `directory`, relative from
`directory`'s parent (e.g., if `directory` is "dataset/class1",
the filenames will be
`["class1/file1.jpg", "class1/file2.jpg", ...]`).
"""
dirname = os.path.basename(directory)
if split:
num_files = len(list(
_iter_valid_files(directory, white_list_formats, follow_links)))
start, stop = int(split[0] * num_files), int(split[1] * num_files)
valid_files = list(
_iter_valid_files(
directory, white_list_formats, follow_links))[start: stop]
else:
valid_files = _iter_valid_files(
directory, white_list_formats, follow_links)
classes = []
filenames = []
for root, fname in valid_files:
classes.append(class_indices[dirname])
absolute_path = os.path.join(root, fname)
relative_path = os.path.join(
dirname, os.path.relpath(absolute_path, directory))
filenames.append(relative_path)
return classes, filenames
def _iter_valid_files(directory, white_list_formats, follow_links):
"""Iterates on files with extension in `white_list_formats` contained in `directory`.
# Arguments
directory: Absolute path to the directory
containing files to be counted
white_list_formats: Set of strings containing allowed extensions for
the files to be counted.
follow_links: Boolean.
# Yields
Tuple of (root, filename) with extension in `white_list_formats`.
"""
def _recursive_list(subpath):
return sorted(os.walk(subpath, followlinks=follow_links),
key=lambda x: x[0]) # os.walk 列出一个地址的根目录 中间目录和文件名
for root, _, files in _recursive_list(directory):
for fname in sorted(files):
if fname.lower().endswith('.tiff'):
warnings.warn('Using ".tiff" files with multiple bands '
'will cause distortion. Please verify your output.')
if get_extension(fname) in white_list_formats:
yield root, fname
def get_extension(filename):
"""Get extension of the filename
There are newer methods to achieve this but this method is backwards compatible.
"""
return os.path.splitext(filename)[1].strip('.').lower()
从文件目录中读取文件,并通过迭代器iterator的方式输出这些内容;随后根据划分的比例筛选出有效的文件进行保存
class DataGenerator(tf.keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, directory, classes=None, number_subsequences=32, dim=(32, 32, 32), n_channels=6,
n_classes=10, shuffle=True, n_samples=None, seed=None, faster=True, online_training=False, repeat=True,
use_spacer=False, randomrepeat=False, sequence_length=50, number_samples_per_batch=32 , **kwargs):
'Initialization'
self.directory = directory
self.classes = classes
self.dim = dim
self.labels = None
self.list_IDs = None
self.n_channels = n_channels
self.shuffle = shuffle
self.seed = seed
self.online_training = online_training
self.repeat = repeat
self.use_spacer = use_spacer
self.randomrepeat = randomrepeat
self.maxLen = kwargs.get("maxLen", None)
self.sequence_length = sequence_length
if number_subsequences == 1:
self.shrink_timesteps = False ## 分割
else:
self.shrink_timesteps = True
self.number_subsequences = number_subsequences
if faster == True:
self.faster = 16
elif type(faster) == int and faster > 0:
self.faster = faster
else:
self.faster = 1
self.number_samples_per_batch = number_samples_per_batch * self.faster
self.number_samples_per_class_to_pick = n_samples
if not classes:
classes = []
for subdir in sorted(os.listdir(directory)):
if os.path.isdir(os.path.join(directory, subdir)):
classes.append(subdir)
self.classes = classes
self.n_classes = len(classes)
self.class_indices = dict(zip(classes, range(len(classes))))
# want a dict which contains dirs and number usable files
pool = multiprocessing.pool.ThreadPool()
function_partial = partial(_count_valid_files_in_directory,
white_list_formats={'csv'},
follow_links=None,
split=None) # partial 是用来冻结参数的,提供一个类似函数的方法
self.samples = pool.map(function_partial, (os.path.join(directory, subdir) for subdir in classes))
self.samples = dict(zip(classes, self.samples))
results = []
for dirpath in (os.path.join(directory, subdir) for subdir in classes):
results.append(pool.apply_async(_list_valid_filenames_in_directory,
(dirpath, {'csv'}, None, self.class_indices, None)))
# 使用apply_async即开始并行处理
self.filename_dict = {}
for res in results:
classes, filenames = res.get()
for index, class_i in enumerate(classes):
self.filename_dict.update({f"{class_i}_{index}": filenames[index]})
pool.close()
pool.join()
if not n_samples:
self.number_samples_per_class_to_pick = min(self.samples.values())
# self.elmo_embedder = Elmo_embedder()
self.elmo_embedder = None
self.on_epoch_end()
# in images wird ein groesses arr classes gemacht (fuer alle sampels) darin stehen OHE die Class
# erstelle filename liste in der die zugehoerige file adresse steht
# laesst sich mergen mit version die oben verlinked
def __len__(self):
'Denotes the number of batches per epoch'
return int(np.floor(len(self.list_IDs) / self.number_samples_per_batch))
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
indexes = self.indexes[index * self.number_samples_per_batch:(index + 1) * self.number_samples_per_batch]
# Find list of IDs
list_IDs_temp = [self.list_IDs[k] for k in indexes]
# Generate data
X, y, sample_weight = self.__data_generation(list_IDs_temp, indexes)
return (X, y)
def on_epoch_end(self):
'make X-train sample list'
"""
go over each class
select randomly #n_sample samples of each class
add selection list to dict with class as key
"""
self.class_selection_path = np.array([])
self.labels = np.array([])
for class_i in self.classes:
samples_class_i = randsomsample(range(0, self.samples[class_i]), self.number_samples_per_class_to_pick)
self.class_selection_path = np.append(self.class_selection_path,
[self.filename_dict[f"{self.class_indices[class_i]}_{i}"] for i in
samples_class_i])
self.labels = np.append(self.labels, [self.class_indices[class_i] for i in samples_class_i])
self.list_IDs = self.class_selection_path
'Updates indexes after each epoch'
self.indexes = np.arange(len(self.list_IDs))
if self.shuffle == True:
if self.seed:
np.random.seed(self.seed)
np.random.shuffle(self.indexes)
该Python文件的主要内容在于:每一轮 epoch 生成指定类别数量的数据用于模型训练过程。每当一个 epoch 结束后,则会根据后续需求自动生成相应 class 的样本数据。这种方法等同于文中所述的“随机重复欠采样”。
training new model
def training(inpath, outpath, name, epochs, architecture, extention_variant, early_stopping, repeated_undersampling):
''' Train a model on your training files generated with make_dataset
\b
Example:
set input output and name of the model
$ vidhop train_new_model -i /home/user/trainingdata/ -o /home/user/model/ --name test
\b
use the LSTM archtecture and the extention variant random repeat
vidhop train_new_model -i /home/user/trainingdata/ --architecture 0 --extention_variant 2
\b
use repeated undersampling for training, note that for this the dataset must have been created with repeated undersampling enabled
vidhop train_new_model -i /home/user/trainingdata/ -r
\b
train the model for 40 epochs, stop training if for 2 epochs the accuracy did not increase
vidhop train_new_model -i /home/user/trainingdata/ --epochs 40 --early_stopping
'''
if extention_variant in (0, 1, 2, 3):
repeat = True
else:
repeat = False
if extention_variant in (2, 3):
randomrepeat = True
else:
randomrepeat = False
if extention_variant in (1, 3):
use_repeat_spacer = True
else:
use_repeat_spacer = False
if extention_variant == 5:
kwargs = dict({"maxLen": -1, "input_subSeqlength": 0})
else:
kwargs = dict()
if extention_variant == 6:
online_training = True
else:
online_training = False
if architecture == 0:
design = 4
else:
design = 7
files = os.listdir(inpath)
assert "Y_train.csv" in files, f"{inpath} must contain Y_train.csv file, but no such file in {files}"
test_and_plot(inpath=inpath, outpath=outpath, suffix=name, online_training=online_training, repeat=repeat,
randomrepeat=randomrepeat, early_stopping_bool=early_stopping, do_shrink_timesteps=True,
use_repeat_spacer=use_repeat_spacer, design=design, nodes=150, faster=True,
use_generator=repeated_undersampling, epochs=epochs, dropout=0.2, accuracy=True, **kwargs)
训练新的模型主要有多种方式,在根据不同数据集的情况下设置不同的参数来进行配置操作,这样就能够完成对模型输入数据集结构的修改。
class lrManipulator(tf.keras.callbacks.Callback):
"""
Manipulate the lr for Adam Optimizer
-> no big chances usefull
"""
def __init__(self, nb_epochs, nb_snapshots):
self.T = nb_epochs
self.M = nb_snapshots
def on_epoch_begin(self, epoch, logs={}):
tf.keras.backend.set_value(self.model.optimizer.lr, 0.001)
if ((epoch % (self.T // self.M)) == 0):
tf.keras.backend.set_value(self.model.optimizer.iterations, 0)
tf.keras.backend.set_value(self.model.optimizer.lr, 0.01)
class TimeHistory(tf.keras.callbacks.Callback): # 计时
"""https://stackoverflow.com/questions/43178668/record-the-computation-time-for-each-epoch-in-keras-during-model-fit"""
def on_train_begin(self, logs={}):
if not hasattr(self, 'times'):
self.times = []
self.time_train_start = time.time()
def on_epoch_end(self, batch, logs={}):
logs = logs or {}
self.times.append(int(time.time()) - int(self.time_train_start))
prediction_val = []
class accuracyHistory(tf.keras.callbacks.Callback):
"""to get the accuracy of my personal voting scores"""
def on_train_begin(self, logs={}):
if not hasattr(self, 'meanVote_val'):
self.meanVote_val = []
self.normalVote_val = []
def on_epoch_begin(self, epoch, logs=None):
global prediction_val
prediction_val = []
def on_epoch_end(self, batch, logs={}):
"""
1. make prediction of train
2. get the voting results
3. calc and save accuracy
4. do same for val set
"""
logs = logs or {}
global prediction_val
if (len(prediction_val) == 0):
prediction_val = (self.model.predict(X_val))
self.prediction_val = prediction_val
y_true_small, y_pred_mean_val, y_pred_voted_val, y_pred, y_pred_mean_exact = \
calc_predictions(X_val, Y_val, do_print=False, y_pred=self.prediction_val)
self.normalVote_val.append(metrics.accuracy_score(y_true_small, y_pred_voted_val))
self.meanVote_val.append(metrics.accuracy_score(y_true_small, y_pred_mean_val))
class roc_History(tf.keras.callbacks.Callback):
"""to get the AUC of my personal voting scores"""
# https://scikit-learn.org/stable/auto_examples/model_selection/plot_roc.html
def __init__(self, name, path):
self.name = name
self.path = path
def on_train_begin(self, logs={}):
if not hasattr(self, 'roc_val'):
# roc curve values for validation set
self.roc_macro = []
# roc curve values of the joined subsequences for the validation set
self.roc_mean_val = []
# roc curve values of the vote of the subsequences for the validation set
self.roc_meanVote_val = []
# thresholds per class
self.thresholds = []
# accuracy with general threshold tuning
self.acc_val_threshold_tuned = []
# accuracy with multi-threshold tuning
self.acc_val_multi_thresholds_tuned = []
def on_epoch_begin(self, epoch, logs=None):
global prediction_val
prediction_val = []
def on_epoch_end(self, batch, logs={}):
"""
1. make prediction of train
2. get the voting results
3. calc and save accuracy
4. do same for val set
"""
logs = logs or {}
# check if allready calculated validation results, if no calc new
global prediction_val
if (len(prediction_val) == 0):
prediction_val = (self.model.predict(X_val))
self.prediction_val = prediction_val
y_true_small, y_pred_mean_val, y_pred_voted_val, y_pred, y_pred_mean_val_exact = \
calc_predictions(X_val, Y_val, do_print=False, y_pred=self.prediction_val)
n_classes = Y_val.shape[-1]
y_true_small_bin = tf.keras.utils.to_categorical(y_true_small, n_classes)
y_pred_mean_val_bin = tf.keras.utils.to_categorical(y_pred_mean_val, n_classes)
优化学习率参数lr,在每个训练周期中收集并输出该周期所需的时间资源、分类准确率指标和AUC面积数值;同时生成可视化图表并进行展示(代码部分略)
class prediction_history(tf.keras.callbacks.Callback):
"""Callback subclass that prints each epoch prediction"""
def on_epoch_end(self, epoch, logs={}):
p = np.random.permutation(len(Y_val)) # 打乱顺序
shuffled_X = X_val[p]
shuffled_Y = Y_val[p]
self.predhis = (self.model.predict(shuffled_X[0:10]))
y_pred = np.argmax(self.predhis, axis=-1)
y_true = np.argmax(shuffled_Y, axis=-1)[0:10]
print(f"Predicted: {y_pred}")
print(f"True: {y_true}")
table = pd.crosstab(
pd.Series(y_true),
pd.Series(y_pred),
rownames=['True'],
colnames=['Predicted'],
margins=True)
print(table)
class History(tf.keras.callbacks.Callback):
"""
Callback that records events into a `History` object.
This callback is automatically applied to
every Keras model. The `History` object
gets returned by the `fit` method of models.
"""
def on_train_begin(self, logs=None):
if not hasattr(self, 'epoch'):
self.epoch = []
self.history = {}
def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
self.epoch.append(epoch)
for k, v in logs.items():
self.history.setdefault(k, []).append(v)
class StopEarly(tf.keras.callbacks.Callback):
"""
Callback that stops training after an epoch
important for online training
"""
def on_epoch_end(self, epoch, logs=None):
self.model.stop_training = True
验证集上的预测准确率被持续记录下来,并维护历史数据记录。当某个epoch内测试准确率不再提升时(即测试准确率停滞不前),将触发模型训练终止。
def model_for_plot(inpath, outpath, design=1, sampleSize=1, nodes=32, suffix="", epochs=100, dropout=0,
faster=False, voting=False, tensorboard=False, early_stopping_bool=True,
shuffleTraining=True, batch_norm=False, online_training=False,
number_subsequences=1, use_generator=True, repeat=True, use_spacer=False, randomrepeat=False,
**kwargs):
"""
method to train a model with specified properties, saves training behavior in /$path/"history"+suffix+".csv"
:param design: parameter for complexity of the NN, 0 == 2 layer GRU, 1 == 2 layer LSTM, 2 == 3 layer LSTM
:param sampleSize: fraction of samples that will be used for training (1/samplesize). 1 == all samples, 2 == half of the samples
:param nodes: number of nodes per layer
:param suffix: suffix for output files
:param epochs: number of epochs to train
:param dropout: rate of dropout to use, 0 == no Dropout, 0.2 = 20% Dropout
:param timesteps: size of "memory" of LSTM, don't change if not sure what you're doing
:param faster: speedup due higher batch size, can reduce accuracy
:param outpath: define the directory where the training history should be saved
:param voting: if true than saves the history of the voting / mean-predict subsequences, reduces training speed
:param tensorboard: for observing live changes to the network, more details see web
:param cuda: use GPU for calc, not tested jet, not working
:return: dict with loss and model
"""
model = tf.keras.models.Sequential()
global batch_size, X_train, X_test, Y_train
# Y_train_noOHE = np.argmax(Y_train, axis=1)
if use_generator:
class_weight = None
else:
Y_train_noOHE = [y.argmax() for y in Y_train]
class_weight = clw.compute_class_weight('balanced', np.unique(Y_train_noOHE), Y_train_noOHE)
class_weight_dict = {i: class_weight[i] for i in range(len(class_weight))}
class_weight = class_weight_dict
print(f"class_weights: {class_weight}")
timesteps = X_test.shape[1]
if faster:
batch = batch_size
else:
batch = batch_size
if design == 0:
model.add(tf.keras.layers.GRU(nodes, input_shape=(timesteps, X_test.shape[-1]), return_sequences=True,
dropout=dropout))
model.add(tf.keras.layers.GRU(nodes, dropout=dropout))
if design == 1:
model.add(tf.keras.layers.LSTM(nodes, input_shape=(timesteps, X_test.shape[-1]), return_sequences=True,
dropout=dropout))
model.add(tf.keras.layers.LSTM(nodes, dropout=dropout))
if design == 2:
model.add(tf.keras.layers.LSTM(nodes, input_shape=(timesteps, X_test.shape[-1]), return_sequences=True,
dropout=dropout))
if batch_norm:
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.LSTM(nodes, return_sequences=True, dropout=dropout))
if batch_norm:
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.LSTM(nodes, dropout=dropout))
if batch_norm:
model.add(tf.keras.layers.BatchNormalization())
if design == 3:
model.add(tf.keras.layers.LSTM(nodes, input_shape=(timesteps, X_test.shape[-1]), return_sequences=True,
dropout=dropout))
model.add(tf.keras.layers.LSTM(nodes, return_sequences=True, dropout=dropout))
model.add(tf.keras.layers.LSTM(nodes, return_sequences=True, dropout=dropout))
model.add(tf.keras.layers.LSTM(nodes, dropout=dropout))
...... 省略部分代码
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['acc'], sample_weight_mode=None)
# return model
filepath = outpath + "/model_best_acc_" + suffix + ".hdf5"
filepath2 = outpath + "/model_best_loss_" + suffix + ".hdf5"
checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath, monitor='val_acc', verbose=1, save_best_only=True,
mode='max')
checkpoint2 = tf.keras.callbacks.ModelCheckpoint(filepath2, monitor='val_loss', verbose=1, save_best_only=True,
mode='min')
predictions = prediction_history()
time_callback = TimeHistory()
if early_stopping_bool:
early_stopping = tf.keras.callbacks.EarlyStopping('val_acc', min_delta=0, patience=epochs // 20,
restore_best_weights=True, verbose=2)
# early_stopping2 = EarlyStopping('val_loss', min_delta=0, patience=epochs//20,restore_best_weights=True)
callbacks_list = [checkpoint, checkpoint2, predictions, time_callback, early_stopping]
else:
callbacks_list = [checkpoint, checkpoint2, predictions, time_callback]
# callbacks_list = [early_stopping2, early_stopping, predictions, time_callback]
if voting:
myAccuracy = accuracyHistory()
myRoc = roc_History(name=suffix, path=outpath)
callbacks_list.append(myAccuracy)
callbacks_list.append(myRoc)
if tensorboard:
if not os.path.isdir(outpath + '/my_log_dir'):
os.makedirs(outpath + '/my_log_dir')
tensorboard = tf.keras.callbacks.TensorBoard(
# Log files will be written at this location
log_dir=outpath + '/my_log_dir',
# We will record activation histograms every 1 epoch
histogram_freq=1,
# We will record embedding data every 1 epoch
embeddings_freq=1,
)
tensorboard = tf.keras.callbacks.TensorBoard(log_dir=outpath + '/my_log_dir', histogram_freq=0, batch_size=32,
write_graph=True, write_grads=False, write_images=False,
embeddings_freq=0, embeddings_layer_names=None,
embeddings_metadata=None)
callbacks_list.append(tensorboard)
if use_generator:
from vidhop.training.DataGenerator import DataGenerator
params = {"number_subsequences": number_subsequences, "dim": timesteps, "n_channels": X_test.shape[-1],
"number_samples_per_batch": batch_size,
"n_classes": Y_test.shape[-1], "shuffle": shuffleTraining, "online_training": online_training,
"seed": 1, "repeat": repeat, "use_spacer": use_spacer, "randomrepeat": randomrepeat, "faster": faster}
# global directory
training_generator = DataGenerator(directory=inpath + "/train", **params, **kwargs)
hist = model.fit(training_generator, epochs=epochs, callbacks=callbacks_list, validation_data=(X_val, Y_val),
class_weight=class_weight, shuffle=shuffleTraining)
else:
if online_training == True:
print("use online training")
通过多组不同模型的组合来选择最优训练方案,在这些组合中可以筛选出在各项指标下表现最优的模型,在具体算法实现过程中可进一步优化时可针对具体算法参数进行微调,并实时记录各轮迭代过程中的关键参数设置于指定存储位置。
def calc_predictions(X, Y, y_pred, do_print=False):
"""
plot predictions
:param X: raw-data which should be predicted
:param Y: true labels for X
:param do_print: True == print the cross-tab of the prediction
:param y_pred: array with predicted labels for X
:return: y_true_small == True labels for complete sequences, yTrue == True labels for complete subsequences, y_pred_mean == with mean predicted labels for complete sequences, y_pred_voted == voted labels for complete sequences, y_pred == predicted labels for complete subsequences
"""
def print_predictions(y_true, y_pred, y_true_small, y_pred_voted, y_pred_sum, y_pred_mean_weight_std,
y_pred_mean_weight_ent):
table = pd.crosstab(
pd.Series(y_encoder.inverse_transform(y_true)),
pd.Series(y_encoder.inverse_transform(y_pred)),
rownames=['True'],
colnames=['Predicted'],
margins=True)
print("standard version")
print(table.to_string())
accuracy = metrics.accuracy_score(y_true, y_pred)
print("standard version")
print("acc = " + str(accuracy))
table = pd.crosstab(
pd.Series(y_encoder.inverse_transform(y_true_small)),
pd.Series(y_encoder.inverse_transform(y_pred_voted)),
rownames=['True'],
colnames=['Predicted'],
margins=True)
print("vote version")
print(table.to_string())
accuracy = metrics.accuracy_score(y_true_small, y_pred_voted)
print("vote version")
print("acc = " + str(accuracy))
table = pd.crosstab(
pd.Series(y_encoder.inverse_transform(y_true_small)),
pd.Series(y_encoder.inverse_transform(y_pred_sum)),
rownames=['True'],
colnames=['Predicted'],
margins=True)
······
根据文中列举出的多种用于计算准确率的方法(如vote、standard、std-div等)来计算准确率即可生成相应的分类统计表
model_path1 = f"{outpath}/model_best_loss_{suffix}.hdf5"
model_path2 = f"{outpath}/model_best_acc_{suffix}.hdf5"
for model_path in (model_path1, model_path2):
print("load model:")
print(model_path)
model = tf.keras.models.load_model(model_path)
pred = model.predict(X_test)
y_true_small, y_pred_mean, y_pred_voted, y_pred, y_pred_mean_exact = calc_predictions(X_test, Y_test,
y_pred=pred,
do_print=True)
print("make test")
myRoc = roc_History(name="_".join(model_path[len(outpath):].split("_")[1:3]) + "_" + suffix, path=outpath)
# myRoc = roc_History(name=suffix, path=outpath)
myRoc.on_train_begin()
global prediction_val
prediction_val = model.predict(X_test)
X_val = X_test
Y_val = Y_test
myRoc.on_epoch_end(0)
# create and export .model file
index_classes = dict()
for i in zip(y_encoder.transform(y_encoder.classes_), y_encoder.classes_):
index_classes.update({i[0]: i[1]})
repeat = True
use_spacer = False
online = False
random_repeat = True
design = design
multi_thresh = myRoc.thresholds[-1]
hosts = Y_test.shape[-1]
pickle.dump(
(model.to_json(), model.get_weights(), index_classes, multi_thresh, maxLen, repeat, use_repeat_spacer,
online_training, randomrepeat, design, hosts), open(f"{model_path.split('.hdf5')[0]}.model", "wb"))
通过训练模型获得的最佳准确率所对应的检查点重建模型,并对text数据集进行验证,评估得到验证集上的准确率和损失值。
