(3) How to Build Realtime Deep Learning Models using Am
作者:禅与计算机程序设计艺术
1.简介
作为 AWS 提供的核心机器学习平台,SageMaker 集成了丰富功能,支持从模型构建到训练部署的全流程操作。本文旨在帮助读者快速掌握 SageMaker 实时深度学习的核心内容。利用 MNIST 数据集开发一个基于手写数字识别的手动模型,并将其部署至 SageMaker 平台进行推理操作。
2.前置条件
- 具备一定的机器学习基础(熟悉数据表示方法与模型评估技术),并对神经网络的工作原理有基本认知;
- 掌握基于 Python 的机器学习框架(包括但不限于 scikit-learn 和 TensorFlow);
- 能够编写并使用 Jupyter Notebook 或其他编程环境进行 Python 编程;
- 在无特殊需求且不依赖特定框架的情况下,默认推荐选择 MXNet 深度学习框架进行开发部署。该框架支持在 CPU 和 GPU 上高效运行,并具备强大的数值计算能力和自动求导功能;
- 已安装 boto3、sagemaker 和 mxnet 库,并可参考官方文档 https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/setup-python.html 了解详细安装步骤;
- 已注册 AWS 账户并完成 SageMaker 设置工作。
3.什么是深度学习?为什么要使用深度学习?
深度学习(deep learning)是机器学习领域中的一个重要组成部分。该技术旨在通过设计复杂的多层神经元连接架构模仿生物神经系统的工作原理,在数据特征提取、信息处理以及数据分析方面发挥重要作用。研究者们通过多层次人工神经元之间的非线性映射关系,在模式识别任务中取得了显著成果,并成功实现了类似人类认知的学习能力和决策机制。
深度学习模型具有以下优点:
- 参数规模较小,并有效降低了计算复杂度。
- 借助激活函数与损失函数的调控机制,在一定程度上能更准确地反映数据分布特征。
- 该方法完全与标签无关,并且对于输入噪声具有较高的鲁棒性。
- 支持端到端学习流程的同时能够有效地提取原始数据中的关键特征。
- 经过充分训练的数据集显著提升了模型的泛化能力。
传统机器学习方法一般仅限于处理静态信息,在面对动态场景时则显得力不从心。相比之下,深度学习架构能够有效管理动态信息,并特别适用于视频、图像、文本、音频等序列性较强的多模态数据分析场景。在大量标注或无标注的数据环境下积累经验后,在面对复杂的数据特征时能够显著提升了数据分析的整体效能
4.基本概念术语说明
(1)MNIST 数据集
这是一个经过修订的、广受欢迎的手写数字识别基准数据集。该集合包含有6万张训练图片和1万张测试图片。每个样本都是单色的手写数字图像,并具有28×28像素的分辨率。
MNIST 数据集的目标是识别手写数字,共有10类,分别对应0~9。
(2)机器学习模型
机器学习模型主要用于基于输入数据预测相应的输出结果,并作为算法或系统的存在。常见的机器学习模型包括决策树、随机森林以及支持向量机等多种类型。
在图像识别领域中,在用于该领域任务时, 涉及的常见机器学习模型包括卷积神经网络技术(CNN)、循环神经网络技术(RNN)、长短期记忆单元(LSTM)、门控递归单元技术(GRU)等其他相关技术.
(3)Amazon SageMaker
该平台整合了全面的机器学习功能,并通过自动化流程支持用户快速构建模型并实现其训练与部署过程
(4)MXNet
MXNet 是一个开放源代码的深度学习框架,在性能、速度和灵活性方面都表现出色。MXNet 在深度学习领域占据着领先地位的原因在于其强大的计算能力和高度灵活的支持系统。具体而言:
- 高性能计算能力使得模型训练更快;
- 快速迭代能力允许开发者不断优化算法;
- 高度可定制化的API则满足了各种复杂需求。
- 性能:该系统采用高度优化的核心计算引擎,并支持多个GPU核心同时运行以提升处理效率。
- 灵活性:该框架提供友好的编程接口,并且允许用户根据需求灵活地自定义模型架构、损失函数以及优化策略。
- 易用性:该工具集提供了便捷的命令行界面以及基于符号的操作界面两种方式供用户选择。
5.核心算法原理和具体操作步骤以及数学公式讲解
(1)手写数字识别模型
准备数据
首先需要准备MNIST数据集,可以使用如下代码下载并解压:
import os
import urllib.request
import gzip
# Download the dataset from http://yann.lecun.com/exdb/mnist/
url = "http://yann.lecun.com/exdb/mnist/"
files = ["train-images-idx3-ubyte.gz",
"train-labels-idx1-ubyte.gz",
"t10k-images-idx3-ubyte.gz",
"t10k-labels-idx1-ubyte.gz"]
for file in files:
filename = file[:-3]
if not os.path.isfile(filename):
filepath = os.path.join("data/", file)
print("Downloading %s..." % filepath)
urllib.request.urlretrieve(url + file, filepath)
# Uncompress the downloaded file
with gzip.open(filepath, 'rb') as f_in:
data = f_in.read()
with open(filename, 'wb') as f_out:
f_out.write(data)
print("%s decompressed." % filename)
代码解读
然后将下载下来的文件转换为可以被MXNet读取的数据格式:
from mxnet import ndarray as nd
import numpy as np
def read_data(label_url, image_url):
with gzip.open(label_url, 'rb') as flbl:
magic, num = struct.unpack(">II", flbl.read(8))
label = nd.array(np.frombuffer(flbl.read(), dtype=np.uint8).astype(np.int32))
with gzip.open(image_url, 'rb') as fimg:
magic, num, rows, cols = struct.unpack(">IIII", fimg.read(16))
image = nd.array(np.frombuffer(fimg.read(), dtype=np.uint8).reshape(num, rows, cols)/255)
return image, label
train_data = [
("data/train-images-idx3-ubyte.gz", "data/train-labels-idx1-ubyte.gz"),
("data/t10k-images-idx3-ubyte.gz", "data/t10k-labels-idx1-ubyte.gz")
]
for img, lbl in train_data:
image, label = read_data(lbl, img)
print('image:', type(image), image.shape)
print('label:', type(label), label.shape)
代码解读
此处将读取到的图片数据除以255来归一化为[0, 1]范围内的值。
定义模型
首先导入所需的包:
import mxnet as mx
from mxnet import gluon
from mxnet import autograd
import logging
logging.getLogger().setLevel(logging.DEBUG)
代码解读
建立一个简单的单层神经网络模型作为研究对象,在该模型中包含1个隐藏层和10个输出节点。其中激活函数选择使用Softmax activation function,损失函数采用交叉熵损失函数。
net = gluon.nn.Sequential()
with net.name_scope():
net.add(gluon.nn.Dense(256, activation="relu"))
net.add(gluon.nn.Dense(10))
net.collect_params().initialize()
代码解读
训练模型
基于MNIST数据集规模较小,在较短时间内即可实现较高的准确率。因此仅进行少量迭代次数即可演示模型训练流程。
batch_size = 100
learning_rate =.1
epochs = 3
loss = gluon.loss.SoftmaxCrossEntropyLoss()
trainer = gluon.Trainer(net.collect_params(),'sgd', {'learning_rate': learning_rate})
for e in range(epochs):
for i in range(0, len(image), batch_size):
data = image[i:i+batch_size].as_in_context(ctx)
label = label[i:i+batch_size].as_in_context(ctx)
with autograd.record():
output = net(data)
L = loss(output, label)
L.backward()
trainer.step(batch_size)
train_acc = nd.mean(nd.argmax(output, axis=1)==label).asscalar()
print("Epoch %s. Loss: %s, Train acc %s" % (e, L.mean().asscalar(), train_acc))
代码解读
测试模型
最后测试一下模型的准确率:
test_data = [
('data/t10k-images-idx3-ubyte.gz', 'data/t10k-labels-idx1-ubyte.gz'),
]
correct_predictions = 0
total_samples = 0
for img, lbl in test_data:
images, labels = read_data(lbl, img)
outputs = []
for i in range(0, len(images), batch_size):
sample_images = images[i:i+batch_size].as_in_context(ctx)
outputs.append(net(sample_images))
predictions = nd.concat(*outputs, dim=0).argmax(axis=1)
total_samples += predictions.size
correct_predictions += (predictions == labels.as_in_context(ctx)).sum().asscalar()
accuracy = float(correct_predictions) / float(total_samples)*100
print("Test accuracy: %.2f%%" % accuracy)
代码解读
(2)部署模型
当模型训练完成后, 我们就可以将其部署到 SageMaker 平台上进行推理操作. 具体来说, 首先在 SageMaker 平台中建立一个笔记本实例, 然后选择 MXNet 镜像作为运行环境.
打开 notebook 时,先要配置一些环境变量:
import sagemaker
from sagemaker import get_execution_role
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_session.region_name
prefix ='sagemaker/DEMO-realtime-deep-learning'
role = get_execution_role()
代码解读
随后制作训练代码,并将之前的代码进行融合处理即可
import subprocess
subprocess.call(['pip', 'install', '-r', './requirements.txt'])
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
import mxnet as mx
from mxnet import gluon
from mxnet import autograd
import sys
import json
import struct
import numpy as np
import gzip
import io
import subprocess
if __name__=='__main__':
try:
sm_hosts = json.loads(os.environ['SM_HOSTS'])
current_host = socket.gethostbyname(socket.gethostname())
host_index = sm_hosts.index(current_host)+1
num_workers = len(sm_hosts)
print('Number of workers={}'.format(num_workers))
print('Host index={}'.format(host_index))
ctx = mx.gpu(host_index%len(mx.test_utils.list_gpus()))
except KeyError:
logger.exception('Exception while getting SM_HOSTS environment variable.')
sys.exit(1)
url = "http://yann.lecun.com/exdb/mnist/"
files = ["train-images-idx3-ubyte.gz",
"train-labels-idx1-ubyte.gz",
"t10k-images-idx3-ubyte.gz",
"t10k-labels-idx1-ubyte.gz"]
for file in files:
filename = file[:-3]
if not os.path.isfile(filename):
filepath = os.path.join("/opt/ml/input/data/training/", file)
print("Downloading %s..." % filepath)
with urllib.request.urlopen(url + file) as response, open(filepath, 'wb') as out_file:
shutil.copyfileobj(response, out_file)
# Uncompress the downloaded file
with gzip.open(filepath, 'rb') as f_in:
data = f_in.read()
with open(filename, 'wb') as f_out:
f_out.write(data)
print("%s decompressed." % filename)
def read_data(label_url, image_url):
with gzip.open(label_url, 'rb') as flbl:
magic, num = struct.unpack(">II", flbl.read(8))
label = nd.array(np.frombuffer(flbl.read(), dtype=np.uint8).astype(np.int32))
with gzip.open(image_url, 'rb') as fimg:
magic, num, rows, cols = struct.unpack(">IIII", fimg.read(16))
image = nd.array(np.frombuffer(fimg.read(), dtype=np.uint8).reshape(num, rows, cols)/255)
return image, label
train_data = [
("./train-images-idx3-ubyte.gz", "./train-labels-idx1-ubyte.gz"),
("./t10k-images-idx3-ubyte.gz", "./t10k-labels-idx1-ubyte.gz")
]
for img, lbl in train_data:
image, label = read_data(lbl, img)
model = gluon.nn.Sequential()
with model.name_scope():
model.add(gluon.nn.Dense(256, activation="relu"))
model.add(gluon.nn.Dense(10))
model.collect_params().initialize()
batch_size = 100
learning_rate = 0.1
epochs = 3
loss = gluon.loss.SoftmaxCrossEntropyLoss()
trainer = gluon.Trainer(model.collect_params(),'sgd', {'learning_rate': learning_rate})
for e in range(epochs):
for i in range(0, len(image), batch_size):
data = image[i:i+batch_size].as_in_context(ctx)
label = label[i:i+batch_size].as_in_context(ctx)
with autograd.record():
output = model(data)
L = loss(output, label)
L.backward()
trainer.step(batch_size)
serialized_model = bytearray(model.export())
with open('/tmp/model.mar', mode='bw') as f:
f.write(serialized_model)
subprocess.check_call(['tar', 'cvzf', '/tmp/model.tgz', '--directory=/', '-C', '/tmp/','model.mar'])
bucket = sagemaker_session.default_bucket()
prefix ='sagemaker/DEMO-realtime-deep-learning/{}'.format(strftime('%Y-%m-%d-%H-%M-%S', gmtime()))
s3_model_location = '{}/{}/model.tgz'.format(bucket, prefix)
sagemaker_session.upload_data(path='/tmp/model.tgz', key_prefix=prefix+'/model')
with open('/tmp/entry_point.py', mode='w') as f:
f.write("""
import sagemaker_containers
from sagemaker_mxnet_container.serving import MXNetModel
from sagemaker_inference import content_types, errors
def model_fn(model_dir):
model = MXNetModel(model_dir=model_dir,
entry_point='entry_point.py',
role=sagemaker.get_execution_role(),
framework_version='1.4.1',
py_version='py3')
return model
def transform_fn(model, request_body, input_content_type, output_content_type):
if request_body is None:
raise ValueError('No request body found. There is nothing to inference.')
try:
tensor = mx.nd.array(json.loads(request_body)['inputs'][0])
prediction = model.predict(tensor)
predicted_class = int(prediction[0][0])
result = json.dumps({'predicted_class': predicted_class}).encode('utf-8')
return result, content_types.JSON
except Exception as e:
error = str(e)
return error, errors.InternalServerFailureError(error)
""")
sagemaker_session.upload_data(path='/tmp/entry_point.py', key_prefix=prefix+'/code')
代码解读
其中,在将模型保存时采用.mar文件格式后又将其打包成.tar.gz格式并上传至S3指定位置。此外还实现了transform_fn函数用于将模型推理请求转发至容器中处理。该函数通过解析 incoming JSON requests来获取数据并将其转换成MXNet张量随后利用该模型进行推断计算并将推断结果返回JSON格式供客户端使用。
完成操作后,在 notebook 界面将展示模型训练进展以及最终测试准确率。通过 SageMaker 平台的控制台功能进行访问。
至此,我们已经顺利地通过 MXNet 完成了一个手写数字识别模型的训练与部署工作。然而,在追求更高的计算效率和更好的训练效果时,请您考虑进一步优化模型架构,并尝试采用更为复杂的网络结构以充分发挥 GPU 的潜力。
