Distributed and Asynchronous ML Training: A Survey on
作者:禅与计算机程序设计艺术
1.简介
机器学习(ML)作为一门关键学科,在多个领域均展现出显著的重要性。
涉及广泛且易被理解的技术领域的深度学习(DL)与强化学习(RL),都基于海量数据、强大的计算能力以及高效算法。
如今已有越来越多的企业与机构认识到机器学习的强大应用价值。
然而这些技术通常需要投入大量的人力物力才能实现更高的效率。
为了提高训练效率并缩短收敛时间,在研究层面逐渐出现了分布式处理与异步优化的方法。
本文将系统地梳理机器学习(ML)的传统方法和最近的研究进展——即异步或分布式方法——并重点分析它们在不同任务与场景下的优劣之处。
2.基本概念和术语
2.1 异步训练
异步训练是指在训练过程中实现模型参数更新与计算操作能够实现并行化的过程,在同步训练模式下每一轮迭代都需要依赖前一轮获得的所有参数值作为基础进行模型优化调整,在同步训练模式下模型参数更新与计算操作是严格同步进行的整个迭代周期会占用较多的时间资源而在异步训练模式下由于各子网络之间可以独立完成参数更新任务因此整体运算效率得到显著提升。具体而言异步训练方法主要包含以下几种实现方式:第一种方法是采用轮次同步机制即各个子网络每隔一定时间才进行一次全局同步以保证同步信息的一致性;第二种方法则是采用渐进式同步策略通过逐步提高同步频率来平衡效率与准确性之间的关系;第三种方法是通过引入延迟机制使得各子网络能够根据最新的可用信息进行局部优化进而减少全局同步对系统性能的影响;第四种方法则是基于事件驱动的设计思想通过动态管理各个子网络的状态以提高系统的整体运行效率。
- 数据并行:基于多块数据分别执行模型参数更新任务。例如,在神经网络训练过程中可采用K折交叉验证法将原始数据划分为若干块独立的数据块(subsets),每一块对应一个独立计算节点(compute node)执行任务。
- 模型并行:采用完全相同的网络结构复制到各个GPU设备中,并分割为两组独立任务分别在不同计算节点上运行,在完成各自的任务后汇总得到最终参数。
- 层次并行:对网络各层次模块展开异步优化操作,在每次迭代过程中先完成卷积模块、全连接模块等逐层展开多轮迭代运算后再进入下一层次优化过程。
2.2 分布式训练
分布式训练的工作原理是将训练任务划分为多台计算机上的子任务,并汇总结果后最终更新模型参数。与传统的集中式架构相比,这种架构能够显著提高计算资源的利用率,并实现更快的响应速度。目前主流的分布式训练框架主要包括两种类型:
- TensorFlow/Horovod:TensorFlow是由Google公司提供的开源深度学习框架,并通过一系列实现分布式运行所需的机制和API接口帮助用户快速构建高效的分布式训练集群系统。Horovod则基于Message Passing Interface(MPI)实现了高效的同步与异步通信功能,在多台计算机上自动创建并管理进程以完成复杂的分布式计算任务。
- Apache MXNet/Spark:MXNet是一种高效且资源消耗低下的分布式训练解决方案,在单机多GPU环境下能够实现并行训练任务,并支持在多节点集群中展开分布式的规模计算任务。而Apache Spark则是一款高性能的数据处理引擎,在支持大规模数据流处理、离线数据分析以及机器学习模型部署等方面展现出卓越的能力。
3.核心算法原理及操作步骤
3.1 数据并行训练
数据并行训练是一种广泛采用的异步训练方法。其核心理念是将相同的训练任务在不同节点上进行反复执行,并通过分配不同的 training data slices 到各个节点中来完成分布式计算的目的。例如,在实现过程中通常会将整个数据集均等地划分为N个 slices,并由每个节点独立处理对应的 slice 数据。这样每个 slice 的数据仅需经过一次处理即可更新模型参数。为了确保各节点之间的计算负担能够均衡分布,在实际应用中常常采用工作窃取机制来平衡负载压力。具体算法流程如下:
- 假设计算资源足够丰富,在此基础上将待处理的数据集按照一定比例均等分割为K个子集,并分配给各个节点处理。
- 每个计算单元独立随机选择自身子集用于本地模型训练。
- 每个单元获取来自其他单元的数据批次,并将其组织成批次供神经网络进行前向传播。
- 对每一个批次的损失函数值进行计算后汇总在一起。
- 通过优化算法更新模型各参数权重参数。
3.2 模型并行训练
该方法即联邦学习技术。
其核心理念是通过将模型复制至多个节点进行训练,并确保各节点之间不共享参数。
在每一次迭代过程中,在不同节点之间进行通信以交换权重信息。
从而实现参数更新。
通过这种方案能够显著地降低模型规模,并提高训练效率以及资源利用率。
具体算法流程如下:
- 当计算资源足够时,在K个节点上复制模型。
- 在每个节点上随机初始化模型参数后,在神经网络中生成一批数据并完成前向传播。
- 对每个批次的损失函数值进行计算后进行汇总。
- 通过优化器更新模型参数。
- 对各个节点的参数执行裁剪以避免溢出后,在其他节点上传并汇总。
- 将所有节点更新后的参数汇总后应用至整体模型上。
3.3 层次并行训练
层次并行训练主要体现在将模型各层的训练过程独立开展,在不互相干扰的前提下实现各层参数更新与优化。该方法不仅显著提升了训练效率和资源利用率,在深度神经网络领域中还形成了良好的并行结构。具体而言,在模型构建阶段需要遵循严格的层次化设计原则;而在实际运行过程中,则可以通过动态调度机制实现各计算节点的最佳匹配与协作配合。详细的算法步骤如下:
依据神经网络架构图实现多层感知机结构中的并行计算机制。各计算单元依据神经网络架构图确定各自对应的处理子网络,并独立获取本地训练样本完成指定的任务运算以提取特征。各子网络生成的特征经拼接或缩减操作整合后传递至下一层级单元处理。通过优化算法更新模型参数完成训练迭代过程。
3.4 小批量梯度下降
Mini-Batch Gradient Descent (MBGD) 是一种非常常用的梯度下降方法,在优化计算成本的同时也能带来更好的训练效果提升。它通过分批将样本输入模型进行训练,在每轮迭代过程中仅更新一次梯度信息而不是所有样本的一次性更新。由于每个批次内部的梯度能够更加准确地反映整体梯度趋势从而使得整个模型的收敛速度会更加高效并且稳定性更强。MBGD不仅支持同步模式下的分布式训练也适合异步分布式场景下的训练过程。具体的算法实现步骤如下所述:
获取训练数据集D,并将其按照规定的批量大小划分为多个mini-batch Dk。
初始化模型参数θk为零值。
对于每一个mini-batch k,在当前mini-batch上求解模型输出yk = f(x;θk)。
计算当前mini-batch k上的损失函数Jk=(yk-yk')^2,并对损失函数关于θ求梯度δJk/dθ。
通过调整αk乘以梯度δJk/dθ来更新模型参数θk。
依次执行上述步骤3至步骤5,直至所有 mini-batch 的损失函数值均不再下降或满足其他终止条件。
3.5 SGD with Momentum
采用动量项的SGD是一种常用的有效优化算法。与基本SGD相比,在每一步更新中都会引入一个动量因子来加速目标函数值下降的速度。这种改进机制的核心优势在于引入动量因子以抑制优化过程中可能出现的振荡现象。具体而言,在每一次迭代过程中,算法通过累积历史梯度信息来维持参数更新的方向性特征,并在此基础上动态调整当前步长的选择范围以避免出现沿着某个方向过于剧烈地振荡或偏离最小值区域的情况。
- 初始化模型参数为 θ 值等于零,并选定动量参数 α 用于优化过程。
- 从训练数据集中选取一小批量样本集 D_k ,通过模型计算在该 mini-batch 上的输出结果 y = f(x; θ) ,并计算对应的目标与预测值之间的损失函数值 J_t = L(y, t) ,其中 L 表示损失函数。
- 更新参数 θ 采用 θ = θ - α∇J_t 的方式 ,其中 ∇J_t 表示损失函数 J_t 对参数 θ 的梯度向量。
- 更新动量变量 v 按照 v = β v + (1 − β)∇J_t 进行迭代更新 ,其中 β 是动量因子。
- 依次执行第 3 步和第 4 步循环操作 ,直至达到预设终止条件或满足收敛标准。
3.6 ASGD
ASGD常被缩写为Average Stochastic Gradient Descent(平均随机梯度下降),主要采用异步更新机制,并广泛应用于样本数量巨大的情形中。它特别适用于训练集规模远超模型参数数量的情况。其核心概念在于持续收集数据并据此更新模型参数而非仅依赖于单一批采样数据来进行参数更新。此外,在优化方法上,ASGD本质上融合了Mini-Batch Gradient Descent与Momentum优化方法的特点。具体算法流程如下:初始化优化器相关超参数后,在迭代过程中从训练集中随机抽取一个批次的数据,并计算该批次对应的目标函数梯度估计值;随后将所有梯度估计值取平均值以获得最终模型参数更新的方向向量;重复上述步骤直至收敛条件满足为止。
初始化模型参数θ的值为θ0。
将初始历史样本集合H定义为仅包含当前样本x1的集合。
从数据集D中随机抽取一个新的样本数据点xk。
通过模型f(x;θ),计算当前输入x对应的输出值y以及对应的损失函数值Jl = L(y, l)。
将新采集的样本数据点xk及其相关的参数θ和损失函数值Jl加入到历史样本集合H中。
根据公式[ (1/m)∑{j=1}^m h^k(x_j), ∑{j=1}^m h^k(g_j) ) ] = [h_k, g_k]来更新模型参数θ的值。
循环回到步骤3直到所有训练数据集D中的所有样本都被处理完毕。
4.具体代码实例
4.1 数据并行训练代码实例
import numpy as np
from sklearn import datasets
from sklearn.model_selection import KFold
def get_data():
X, y = datasets.make_classification(n_samples=1000, n_features=50, n_classes=2)
return X, y
def data_parallel(X, y):
cv = KFold(n_splits=5)
for train_index, test_index in cv.split(X):
# split the dataset into training and testing set based on folds
x_train, x_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
# train a model using x_train and y_train
model.fit(x_train, y_train)
# evaluate the trained model on the testing set of each fold
score = model.score(x_test, y_test)
print('Score:', score)
if __name__ == '__main__':
X, y = get_data()
data_parallel(X, y)
代码解读
4.2 模型并行训练代码实例
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.nn.modules.loss import CrossEntropyLoss
class CIFARDataset(torch.utils.data.Dataset):
def __init__(self, data, target, transform=None):
self.transform = transform
self.data = data
self.target = target
def __len__(self):
return len(self.data)
def __getitem__(self, index):
img, target = self.data[index], int(self.target[index])
if self.transform is not None:
img = self.transform(img)
return img, target
def cifar_model_parallel(train_dataset, device):
num_devices = torch.cuda.device_count()
batch_size = 32 // num_devices
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, drop_last=True)
loss_fn = CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=0.001)
net = models.resnet50()
net.fc = torch.nn.Linear(2048, 10)
net = torch.nn.DataParallel(net).to(device)
for epoch in range(epochs):
running_loss = 0.0
for i, data in enumerate(train_loader):
inputs, labels = data
optimizer.zero_grad()
outputs = net(inputs.to(device))
loss = loss_fn(outputs, labels.to(device))
loss.backward()
optimizer.step()
running_loss += loss.item()
print('[%d/%d][%d/%d]\tLoss: %.3f' %
(epoch + 1, epochs, i + 1, len(train_loader), running_loss / i))
代码解读
4.3 层次并行训练代码实例
import tensorflow as tf
from keras.layers import Input, Dense, Conv2D, MaxPooling2D, Flatten
from keras.models import Model
def layered_training():
input_shape = (28, 28, 1)
output_shape = 10
base_input = Input(shape=input_shape)
x = Conv2D(filters=32, kernel_size=(3, 3))(base_input)
x = MaxPooling2D()(x)
x = Flatten()(x)
x = Dense(units=128)(x)
base_output = Dense(units=output_shape, activation='softmax')(x)
feature_extraction_model = Model(base_input, base_output)
feature_extraction_model.compile(optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy'])
classifier_input = Input(shape=feature_extraction_model.output.shape[1:])
x = Flatten()(classifier_input)
x = Dense(units=64, activation='relu')(x)
classifier_output = Dense(units=output_shape, activation='softmax')(x)
final_model = Model([feature_extraction_model.input, classifier_input], [feature_extraction_model.output, classifier_output])
final_model.compile(optimizer='adam',
loss={'FeatureExtractionModel': 'binary_crossentropy',
'ClassifierModel': 'categorical_crossentropy'},
loss_weights={'FeatureExtractionModel': 0.9,
'ClassifierModel': 0.1})
...
if __name__ == '__main__':
layered_training()
代码解读
