update structure (#28)

This commit is contained in:
Luo Mai
2022-03-03 08:54:20 +00:00
committed by GitHub
parent 7671dca95a
commit 1f1f009cea
46 changed files with 17 additions and 18 deletions

View File

@@ -0,0 +1,21 @@
## 保序性设计
和常规数据并行计算任务不同的是,机器学习场景下的数据并行处理为了确保实验的可复现性需要维护保序的性质。在具体实现中,我们需要保证并行数据预处理后的数据输出顺序与输入顺序保持相同(即下图中的SeqB和SeqA相同)。这确保了每一次的数据模块的结果输出顺序由数据混洗模块输出顺序唯一确定有助于用户在不同的实验之间进行比较和调试。不同的机器学习系统采用了不同的方案来确保保序性我们以MindSpore的实现为例子进行介绍以加深读者对这部分内容的理解。
![数据的保序性——确保SeqB与SeqA相同](../img/ch07/7.4/data_ordering.png)
:width:`800px`
:label:`data_order_definition`
MindSpore通过约束算子线程组间的通信行为来确保对当前算子的下游算子的输入顺序与自己的输入顺序相同基于这种递归的约束确保了整个并行数据处理最后一个算子的输出顺序与第一个算子的输入顺序相同。具体实现中MindSpore以Connector为算子线程组间的通信组件对Connector的核心操作为上游算子的Push操作以及下游算子的Pop操作我们重点关注MindSpore对这两个行为的约束。
Connector的使用有如下两个要求:
- Connector两端的数据生产线程组和数据消费线程组中的线程分别从0开始编号。
- 确保数据生产者的输入数据顺序是在各个生产者线程间为按顺序轮询分布(Round-Robin distribution), 即当生产者线程组大小为M时生产者线程0拥有第(0 + M \* k)个数据生产者线程1拥有第(1 + M \* k)生产者线程2拥有第(2 + M \* k)个数据等(其中k=0123\...)。
Connector中维护与生产者线程数目相同的队列并确保向Connector中放入数据时每个生产者线程生产的数据只放到对应编号的队列中这样可以确保Connector中的数据在不同的队列间的分布与在不同生产者线程组之间的分布相同(代码片段中的Push函数)。接着当Connector的消费者线程组从Connector中获取数据时我们需要确保最终数据在不同的消费者线程间依然为按顺序轮询分布即当消费者线程组大小为N时消费者线程0拥有第(0 + N \* k)个数据消费者线程1拥有第(1 + N \* k)消费者线程2拥有第(2 + N \* k)个数据等(其中k=0123\...)。为此当有消费者线程从Connector中请求数据时Connector在确保当前请求消费者线程编号i与待消费数据标号j符合$i=j\%N$的关系下(其中N为消费者线程数目)按照轮循的方式从各个队列中获取数据如果二者标号不符合上述关系则该请求阻塞等待。通过这种通信的约束方式MindSpore实现了保序功能。
![MindSpore保序性实现](../img/ch07/7.4/mindspore_data_order.jpeg)
:width:`800px`
:label:`mindspore_data_order_implementation`

View File

@@ -0,0 +1,97 @@
## 单机数据处理性能的扩展
上文我们介绍了通过并行架构发挥多核CPU算力来加速数据预处理以满足芯片上模型计算对于数据消费的吞吐率需求这在大部分情况下都能解决用户的问题。然而数据消费性能随着AI芯片的发展在逐年快速增长即模型计算速率在变快而主要借助CPU算力的数据模块却由于摩尔定律的逐渐终结无法享受到芯片性能提升带来的硬件红利使得数据生产的性能很难像模型计算性能一样逐年突破。不仅如此近几年AI服务器上AI芯片数量的增长速度远超CPU数量的增长速度进一步加剧了芯片的数据消费需求与数据模块的数据生产性能之间的矛盾。我们以英伟达(NVIDIA)公司生产的NVIDIA DGX系列服务器为例子DGX-1服务器中配置有40个CPU核和8个GPU芯片而到了下一代的NVIDIA DGX-2服务器时GPU芯片的数目增长了到了16个而CPU核的数目仅从40个增加到了48个。由于所有的GPU芯片在训练时共享CPU的算力故平均而言每个GPU芯片(数据消费者)能够使用的算力从NVIDIA DGX-1时的5CPU核/GPU下降到了 NVIDIA DGX-2的3CPU核/GPUCPU的算力瓶颈会导致用户使用多卡训练时无法达到预期的扩展性能。针对单机上的CPU算力不足的问题我们给出两种目前常见的两种解决方案即基于CPU+AI芯片的异构数据处理的加速方案和基于分布式数据预处理的扩展方案。
### 基于异构计算的数据预处理
由于AI芯片相比于CPU拥有更丰富的算力资源故在CPU算力成为数据预处理瓶颈时通过借助AI加速芯片来做数据预处理是一个行之有效的方案。虽然AI芯片不具备通用的数据预处理能力但是由于大部分高耗时的数据预处理都是Tensor相关的计算如语音中的快速傅立叶变换(Fast Fourier Transform, FFT)图像中的去噪等使得部分操作可以被卸载到AI芯片上来加速。如华为昇腾Ascend310芯片上的Dvpp模块为芯片内置的硬件解码器相较于CPU拥有对图形处理更强劲的性能Dvpp支持JPEG图片的解码缩放等图像处理基础操作用户实际数据预处理中可以指定部分图像处理在昇腾Ascend310芯片上完成以提升数据模块性能。
```python
namespace ms = mindspore;
namespace ds = mindspore::dataset;
// 初始化操作
//...
// 构建数据处理算子
// 1. 解码
std::shared_ptr<ds::TensorTransform> decode(new ds::vision::Decode());
// 2. 缩放
std::shared_ptr<ds::TensorTransform> resize(new ds::vision::Resize({256}));
// 3. 归一化
std::shared_ptr<ds::TensorTransform> normalize(new ds::vision::Normalize(
{0.485 * 255, 0.456 * 255, 0.406 * 255}, {0.229 * 255, 0.224 * 255, 0.225 * 255}));
// 4. 剪裁
std::shared_ptr<ds::TensorTransform> center_crop(new ds::vision::CenterCrop({224, 224}));
// 构建流水并指定使用昇腾Ascend进行计算
ds::Execute preprocessor({decode, resize, center_crop, normalize}, MapTargetDevice::kAscend310, 0);
// 执行数据处理流水
ret = preprocessor(image, &image);
```
相比较Dvpp只支持图像的部分预处理操作英伟达公司研发的DALI\[8\]是一个更加通用的基于GPU的数据预处理加速框架。DALI中包含如下三个核心概念
- DataNode:表示一组Tensor的集合
- Operator:对DataNode进行变换处理的算子一个Operator的输入和输出均为DataNode。比较特殊的是DALI中的算子可以被设置为包括cpugpumixed三种不同执行模式其中cpu模式下算子的输入输出均为cpu上的DataNodegpu模式下算子的输入输出均为gpu上的DataNode而mixed模式下的算子的输入为cpu的DataNode而输出为gpu的DataNode。
- Pipeline:用户通过Operator描述DataNode的处理变换过程而构建的数据处理流水
实际使用中用户通过设置算子的运行模式(mode)来配置算子的计算是用CPU还是GPU完成计算同时DALI中有如下限制当一个算子为mixed模式或者gpu模式时其所有的下游算子强制要求必须为gpu模式执行。
![NVIDIA DALI概览](../img/ch07/7.5/dali_overview.png)
:width:`800px`
:label:`dali_overview`
下面展示一段使用DALI构建数据处理流水线的示例代码我们从文件中读取图片数据经过混合模式的解码再经过运算在GPU上的旋转和缩放算子处理后返回给用户处理
结果。由于其展示出的优异性能,
DALI被广泛的用于高性能推理服务和多卡训练性能的优化上。
```python
import nvidia.dali as dali
pipe = dali.pipeline.Pipeline(batch_size = 3, num_threads = 2, device_id = 0)
with pipe:
files, labels = dali.fn.readers.file(file_root = "./my_file_root")
images = dali.fn.decoders.image(files, device = "mixed")
images = dali.fn.rotate(images, angle = dali.fn.random.uniform(range=(-45,45)))
images = dali.fn.resize(images, resize_x = 300, resize_y = 300)
pipe.set_outputs(images, labels)
pipe.build()
outputs = pipe.run()
```
### 基于分布式的数据预处理
分布式数据预处理是另一种解决CPU算力性能不足的可选方案。一种常见的做法是借助Spark、Dask等现有大数据计算框架进行数据预处理并将结果写入分布式文件系统而训练的机器只需要读取预处理的结果数据并进行训练即可。
![基于第三方分布式计算框架的分布式数据预处理](../img/ch07/7.5/distribute.png)
:width:`800px`
:label:`distributed_data_preprocess_based_on_3rd_party_software`
该方案虽然再业内被广泛使用,却面临着三个问题:
- 由于数据处理和数据训练采用不同的框架,使得用户为此常常需要在两个不同的框架中编写不同语言的程序,增加了用户的使用负担。
- 由于数据处理系统和机器学习两个系统间无法做零拷贝的数据共享,使得数据的序列化和反序列化常常成为不可忽视的额外开销。
- 由于大数据计算框架并不是完全针对机器学习场景,使得某些分布式预处理操作如全局的数据混洗无法被高效的实现。
为了更适配机器学习场景的数据预处理分布式机器学习框架Ray借助其自身的任务调度能力实现了简单的分布式的数据预处理------
Ray Dataset\[10\],由于数据预处理和训练处在同一个框架内,在降低了用户的编程负担的同时也通过数据的零拷贝共享消除了序列化/反序列化带来的额外开销。Ray Dataset支持如map、batch、map、filter等简单并行数据集变换算子、以及如mean等一些基础的聚合操作算子。同时Ray
Dataset也支持排序、随机打乱、GroupBy等全局混洗操作该方案目前处在研究开发中还未被广泛的采用感兴趣的读者可以翻阅相关资料进一步的了解。
```python
ray.data.read_parquet("foo.parquet") \
.filter(lambda x: x < 0) \
.map(lambda x: x**2) \
.random_shuffle() \
.write_parquet("bar.parquet")
```

View File

@@ -0,0 +1,31 @@
# 数据处理框架
在前两个章节中,我们介绍了编译器前后端的相关内容,详细地阐述了源程序到目标程序的转换优化过程。除了让芯片在训练/推理过程中高性能地运行我们还需要将数据高效地发送给芯片以实现全流程的性能最优。机器学习模型训练和推理需要从存储设备如本地磁盘和内存、远端的存储系统等中加载数据集对数据集进行一系列处理变换将处理结果发送到到GPU或者华为昇腾Ascend等加速器中完成模型计算该流程的任何一个步骤出现性能问题都会对训练和推理的吞吐率造成负面影响。本章我们将核心介绍如何设计、并实现一个面向机器学习场景的数据系统以帮助用户轻松构建各种复杂的数据处理流水线(Data
Pipeline),同时我们的数据系统要有足够高的执行性能,以确保数据预处理步骤不会成为模型训练和推理的性能瓶颈。
本章主要从易用性、高效性和保序性三个维度展开介绍机器学习系统中的数据模块。在前两个小节中我们首先讨论如何构建一个易用的数据模块。包括如何设计编程抽象使得用户通过短短几行代码便可以描述一个复杂的预处理过程以及如何做到既内置丰富算子提升易用性又可以灵活支持用户使用自定义算子覆盖长尾需求。用户构建好数据处理流程后数据模块需要负责高效的调度执行数据流水线以达到最优的数据处理吞吐率。高效的执行数据流水线是一个具有挑战性的任务我们既要面临数据读取部分的I/O性能问题又要解决数据处理部分的计算性能问题。针对上述挑战我们将分别介绍面向高吞吐率读取性能的数据文件格式设计以及能够充分发挥多核CPU算力的并行架构设计。不仅如此和常规数据并行计算任务不同的是大部分机器学习场景对于数据的输入输出顺序有着特殊的`保序性`的要求,我们将会使用一节的内容来介绍什么是保序性,以及如何在数据模块的并行架构中设计相应组件计来满足该特性需求。学习了上述的内容后,读者将会对如何构建一个面向机器学习场景高效易用的数据模块有深刻的理解。最后,作为拓展内容,我们将以目前学术界和业界的一些实践经验来介绍当单机处理性能达不到要求时,该如何去扩展我们的数据处理模块以满足训练性能需求。本章学习目标包括:
- 了解机器学习数据模块架构中的关键组件及其功能
- 了解不同数据模块用户编程接口的设计
- 掌握面向高性能数据读取的数据文件格式设计
- 掌握机器学习系统数据模块并行架构
- 掌握机器学习系统数据模块数据保序性含义及其解决方案
- 了解两种单机数据处理性能扩展方案
```toc
:maxdepth: 2
requirements
program_model
performance
data_order
extension
summary
reference
```

View File

@@ -0,0 +1,183 @@
## 高效性设计
在上一节中我们重点介绍了数据模块的编程抽象以及编程接口设计确保用户可以方便的基于我们提供的API描述数据处理流程而不需要过多关注实现和执行细节。那么本节我们将进一步探究数据加载以及流水线调度执行等数据模块关键部分设计细节以确保用户能够拥有最优的数据处理性能。同时在本节内容中我们也会贯穿现有主要机器学习系统的实践经验以帮助读者加深对这些关键设计方案的理解。
如 :numref:`async_data_process` 所示深度学习模型训练需要借助数据模块首先从存储设备中加载数据集在内存中进行一系列的预处理变换最终将处理好的数据集发送到加速器芯片上执行模型的计算目前有大量的工作都着重于研究如何通过设计新的硬件或者应用算子编译等技术加速芯片上的模型计算而在数据梳理流水的性能问题上鲜有涉及。但事实上很多情况下数据预处理的执行时间往往在整个训练任务中占据着相当大的比例导致GPU/华为昇腾Ascend等加速器无法被充分利用。研究数据表明企业内数据中心的计算任务大约有30%的计算时间花费在数据预处理步骤\[5\]也有研究发现在一些公开数据集上的模型训练任务有65%的时间都花费在了数据预处理上\[6\],由此可以看出数据模块的性能对于整体训练吞吐率有着决定性的影响。
![数据加载、预处理、模型计算异步并行执行](../img/ch07/7.3/async_data_process.png)
:width:`800px`
:label:`async_data_process`
为了追求最高的训练吞吐率现有系统一般选择将数据读取、数据预处理计算、以及芯片上的模型计算三个步骤异步并行执行。这三步构成了典型的数据生产者和数据消费者的上下游关系我们将数据从存储设备中的读取速率用F表示数据预处理速率用P表示芯片上的数据消费速率用G表示。理想情况下我们希望G < min(F, P)此时加速芯片不会因为等待数据而阻塞。然而现实情况下我们常常要么因为数据加载速率F过低(称为I/O Bound)要么因为数据预处理速率P过低(称为CPU Bound)导致G>min(F, P)而使得芯片无法被充分利用。针对上述关键性能问题,我们将在本节重点探究两个内容:
- 如何针对机器学习场景的特定I/O需求来设计相应文件格式及加载方式以优化数据读取速率F。
- 如何设计并行架构来充分发挥现代多核CPU的计算能力以提升数据处理速率P。
在本节的最后我们还会研究一个具有挑战性的问题,即如何利用我们在前几章学到的计算图的编译技术来优化用户的数据处理计算流图,以进一步达到最优的数据处理吞吐率性能。那么接下来,请读者和我们一起开启本节的头脑风暴旅程。
### 数据读取的高效性
首先我们来研究如何解决数据读取的性能挑战。我们面临的第一个问题是数据类型繁多存储格式不统一带来的I/O差异如文本数据可能存储成txt数据格式图像数据可能存储成原始格式或者如JPEG等压缩格式。我们显然无法去针对每一种存储情况都设计其最优的数据读取方案。但是我们可以通过提出一种统一的存储格式(我们称之为Unirecord格式)以屏蔽不同数据类型的I/O差异并基于这种数据格式进行数据加载方案的设计与优化而实际使用中用户只需要将其原始数据集转换存储为我们的统一数据格式便可以享受到高效的读取效率。
![统一数据格式](../img/ch07/7.3/uni_record.png)
:width:`800px`
:label:`unified_record_format`
那么我们的Unirecord除了统一用户存储格式之外还需要具备哪些特性呢机器学习模型训练中对数据的访问具有如下特点
- 每一个Epoch内以一种随机顺序遍历所有的数据且每个数据只被遍历一次
- 所有Epoch需要以不同的随机顺序遍历访问所有数据
上述的访问特性要求我们的Unirecord存储格式能够支持高效的随机读取。当我们的数据集能够全部存储在RAM中时对Unirecord的随机读取并不会成为大的问题。但是当数据集大到必须存储在本地磁盘或者分布式文件系统中时我们就需要设计特定的方案。一个直观的想法是将一个Unirecord文件分为索引块和数据块索引块中记录每个数据在文件中的大小、偏移以及一些校验值等元信息数据块存储每个数据的主体数据。当我们需要对一个Unirecord格式的文件进行随机读取时我们首先在内存中加载该文件的索引块(通常远远小于整个文件大小)并在内存中建立文件内数据的索引表接着当我们需要随机读取数据时我们首先在索引表中查询该数据在文件中的偏移、大小等信息并基于该信息从磁盘上进行读取。这样的读取方式可以满足我们在磁盘上的随机读取需求。接下来我们以MindSpore提出的MindRecord的实践经验为例子介绍统一文件格式的设计以帮助大家加深对这部分内容的理解
![支持随机读取的文件格式设计](../img/ch07/7.3/file_indexing.png)
:width:`800px`
:label:`file_random_access`
#### MindRecord介绍
MindRecord是MindSpore推出的统一数据格式目标是归一化用户的数据集优化训练数据的读取过程。该文件格式具备如下特征
- 实现多变的用户数据统一存储、访问,训练数据读取更加简便。
- 数据聚合存储,高效读取,且方便管理、移动。
- 高效的数据编解码操作,对用户透明、无感知。
- 可以灵活控制分区的大小,实现分布式训练。
和我们前文设计的Unirecord思路相似一个MindRecord文件也由数据文件和索引文件组成数据文件包含文件头、标量数据页、块数据页用于存储用户归一化后的训练数据索引文件包含基于标量数据如图像Label、图像文件名等生成的索引信息用于方便的检索、统计数据集信息。为确保对一个MindRecord文件的随机读取性能MindSpore建议单个MindRecord文件小于20G若数据集超过20G用户可在MindRecord数据集生成时指定相应参数将原始数据集分片存储为多个MindRecord文件。
![MindRecord文件格式组成](../img/ch07/7.3/MindRecord_format.png)
:width:`800px`
:label:`mindrecord_format`
一个MindRecord文件中的数据文件部分具体的关键部分的详细信息如下
- **文件头**
文件头主要用来存储文件头大小、标量数据页大小、块数据页大小、Schema信息、索引字段、统计信息、文件分区信息、标量数据与块数据对应关系等是MindRecord文件的元信息。
- **标量数据页**
标量数据页主要用来存储整型、字符串、浮点型数据如图像的Label、图像的文件名、图像的长宽等信息即适合用标量来存储的信息会保存在这里。
- **块数据页**
块数据页主要用来存储二进制串、Numpy数组等数据如二进制图像文件本身、文本转换成的字典等。
用户训练时MindRecord的读取器能基于索引文件快速的定位找到数据所在的位置并将其读取解码出来。另外MindRecord具备一定的检索能力用户可以通过指定查询条件筛选获取符合期望的数据样本。
对于分布式训练场景MindRecord会基于数据文件中Header及索引文件进行元数据的加载得到所有样本的ID及样本在数据文件中的偏移信息然后根据用户输入的num_shards训练节点数和shard_id当前节点号进行数据的partition得到当前节点的num_shards分之一的数据分布式训练时多个节点只读取数据集的num_shards分之一借由计算侧的AllReduce实现整个数据集训练的效果。进一步如果用户开启shuffle操作那么每epoch保证所有节点shuffle seed保持一致那么对所有样本的ID shuffle结果是一致的那么数据partition的结果就是正确的。
![MindRecord Partition策略](../img/ch07/7.3/partition.png)
:width:`800px`
:label:`mindrecord_partition`
### 数据计算的高效性
解决了数据读取性能问题后,我们继续来研究数据计算的性能提升(即最大化上文中的数据处理速率P)。我们以上文提及的数据预处理流水为例子、来研究如何设计数据模块对用户计算图的调度执行以达到最优的性能。
![数据预处理流程串行顺序执行示意图](../img/ch07/7.3/single_pipeline.png)
:width:`800px`
:label:`serialized_data_process`
由于深度学习芯片如GPU/华为昇腾Ascend等并不具备通用数据处理的能力
我们目前还是主要依赖CPU来完成预处理计算。主流的AI服务器大多具备多个多核CPU数据模块需要设计合理的并行架构充分发挥多核算力以提升数据预处理性能达到尽可能减少加速器由于等待数据而阻塞的目的。本节中我们将介绍流水线粒度并行以及算子粒度并行两种常见的并行架构。流水线并行的方式结构清晰易于理解和实现主要被Pytorch这样基于Python实现数据模块的机器学习系统所采用。受到经典数据并行系统调度执行架构设计的影响其他如Google的TensorFlow以及华为的MindSpore等系统主要采用算子粒度并行做精细CPU算力分配以达到充分利用多核算力的目的。然而精细的分配意味着我们需要对所有数据处理流程中涉及的算子设置合理的并行参数这对用户而言是一个较大的挑战。于是MindSpore等框架又提供数据流图中关键参数自动调优的功能通过运行时的动态分析自动搜索得到最优的算子并行度等参数极大的减少了用户的编程负担。接下来我们一一展开讨论。
#### 流水线并行
第一种常见的并行方案为流水线粒度的并行,即我们把用户构建的计算流水在一个线程/进程内顺序串行执行,同时启动多个线程/进程并行执行多个流水线。假设用户总共需要处理N个数据样本那么当流水线并行度为M时每个进程/线程只需要执行处理(N/M)个样本。流水线并行架构结构简单,易于实现。整个并行架构中各个执行进程/线程只需要在数据执行的开始和结束进行跨进程/线程的通信即可,数据模块将待处理的数据任务分配给各个流水线进程/线程并在最终进行结果汇总发送到芯片上进行模型计算。从用户的角度而言使用也相对方便只需要指定关键的并行度参数即可。接下来我们以Pytorch为例子进行详细展开。
![流水线级别并行执行示意图](../img/ch07/7.3/pipeline_parallisim.png)
:width:`800px`
:label:`pipeline_parallisim`
在Pytorch中用户只需要实现一个Dataset的Python类编写数据处理过程Dataloader通过用户指定的并行度参数num_workers来启动相应个数的Python进程调用用户自定义的Dataset类进行数据预处理。Dataloader中的进程有两类角色worker进程以及主进程以及两类进程间通信队列index_queue以及worker_result_queue。训练过程中主进程负责将待处理数据任务列表通过index_queue发送给各个worker进程每个woker进程执行用户编写的Dataset类的数据预处理逻辑并将处理后的结果通过worker_result_queue返回给主进程。
![Pytorch Dataloader并行执行架构](../img/ch07/7.3/pytorch_dataloader.png)
:width:`800px`
:label:`pytorch_dataloader`
接下来我们展示一段用户使用Pytorch的Dataloader进行并行数据预处理的代码片段可以发现我们只需要实现Dataset类描述数据预处理逻辑并指定num_workers即可实现流水线粒度的并行数据预处理。
```python
# 描述数据预处理流程
class TensorDataset:
def __init__(self, inps):
sef.inps = inps
def __getitem__(self, idx):
data = self.inps[idx]
data = data + 1
return data
def __len__(self):
return self.inps.shape[0]
inps = torch.arange(10 * 5, dtype=torch.float32).view(10, 5)
dataset = TensorDataset(inps)
# 指定并行度为3
loader = DataLoader(dataset, batch_size=2, num_workers=3)
for batch_idx, sample in enumerate(loader):
print(sample)
```
最后需要指出的是, Pytorch Dataloader的执行过程中涉及大量进程间通信虽然为了加速这一步骤Pytorch对Tensor类数据实现了基于共享内存的进程间通信机制。然而当通信数据量较大时跨进程通信仍然会较大地影响端到端的数据预处理吞吐率性能。当然这不是流水线并行自身的架构问题而是由于CPython的全局解释器锁(Global Interpreter Lock, GIL)导致在Python层面实现流水线并行时只能采用进程并行。为了解决这个问题目前Pytorch团队也在尝试通过移除CPython中的GIL来达到基于多线程实现流水线并行以提升通信效率的目的\[7\],感兴趣的读者可以选择继续深入了解。
#### 算子并行
流水线并行中算力(CPU核心)的分配以流水线为粒度相对而言以算子为计算资源分配粒度的算子并行是一种追求更精细算力分配的并行方案。我们期望对计算耗时高的算子分配更高的并行度计算耗时低的算子分配更低的并行度以达到更加高效合理的CPU算力使用。算子并行想法和经典的数据并行计算系统的并行方式一脉相承我们以经典的MapReduce计算执行为例子我们发现这也可以认为是一种算子并行(map算子和reduce算子),其中map算子的并行度和reduce算子的并行度根据各个算子阶段的计算耗时而决定。
![MapReduce经典并行执行架构](../img/ch07/7.3/map_reduce.png)
:width:`800px`
:label:`mapreduce`
下图中我们给出本节开头数据预处理流程的算子并行架构示意图我们根据各个算子的计算耗时设置图片解码算子并行度为3图片缩放并行度为2图片随机旋转算子并行度为4图片归一化算子并行度为3以及图像通道转置算子并行度为1。我们期望通过给不同耗时的算子精准的分配算力以达到算力高效充分的利用。具体实现中算子并行一般采用线程级并行所有的算子使用线程间队列等方法进行共享内存通信。
![算子并行执行架构](../img/ch07/7.3/operator_parallisim.png)
:width:`800px`
:label:`operator_parallisim`
现有机器学习系统的数据模块中tf.data以及MindData均采用了算子并行的方案。由于对算力的利用更加充分、以及基于C++的高效数据流调度实现算子并行的方案往往展示出更好的性能tf.data的性能评测表明其相比较Pytorch的Dataloader有近两倍的性能优势\[5\]。
接下来我们以一段基于MindSpore实现本节开篇的数据预处理流程来展示如何在一个算子并行的数据流水线中设置各个算子的并行度。
```python
import mindspore.dataset as ds
import mindspore.dataset.transforms.c_transforms as c_transforms
import mindspore.dataset.transforms.vision.c_transforms as vision
# 读取数据
dataset_dir = "path/to/imagefolder_directory"
dataset = ds.ImageFolderDatasetV2(dataset_dir, num_parallel_workers=8)
transforms_list = [vision.Decode(),
vision.Resize((256, 256)),
vision.RandomRotation((0, 15)),
vision.Normalize((100, 115.0, 121.0), (71.0, 68.0, 70.0)),
vision.HWC2CHW()]
onehot_op = c_transforms.OneHot(num_classes)
# 解码算子并行度为3
dataset = dataset.map(input_columns="image", operations=vision.Decode(), num_parallel_workers=3)
# 缩放算子并行度为2
dataset = dataset.map(input_columns="image", operations=vision.Resize((256, 256)), num_parallel_workers=2)
# 随机旋转算子并行度为4
dataset = dataset.map(input_columns="image", operations=vision.RandomRotation((0, 15)), num_parallel_workers=4)
# 正规化算子并行度为3
dataset = dataset.map(input_columns="image", operations=vision.Normalize((100, 115.0, 121.0), (71.0, 68.0, 70.0)), num_parallel_workers=3)
# 通道转置算子并行度为1
dataset = dataset.map(input_columns="image", operations=vision.HWC2CHW(), num_parallel_workers=1)
dataset = dataset.map(input_columns="label", operations=onehot_op)
```
我们发现虽然算子并行具备更高的性能潜力但却需要我们对每一个算子设置合理并行参数。这不仅对用户提出了较高的要求同时也增加了由于不合理的并行参数设置导致性能反而下降的风险。为了让用户更加轻松的使用算子并行tf.data和MindData都增加了流水线关键参数动态调优功能基于对流水线执行时的性能监控计算得到合理的参数以尽可能达到最优的数据预处理吞吐率\[5\]。
#### 数据处理计算图优化
在前文中,我们专注于通过并行架构来高效执行用户构建的数据预处理计算图。但我们可以思考如下问题:用户给定的计算图是否是一个高效的计算图?
如果不高效我们是否能够在保证等价变换的前提下将用户的数据计算图进行优化重写得到执行性能预期更好的计算图没错这和我们在前几章中学习的模型计算图编译优化有着相同的思想即通过分析变换计算图IR得到更优的IR表示来达到更好的执行性能。常用的数据图优化策略有算子融合以及map操作向量化两种。算子融合将map+map、map+batch、map+filter、filter+filter等算子组合融合成一个等价复合算子将原先需要在两个线程组中执行的计算融合为在一个线程组中执行的复合计算减少线程间的同步通信开销达到了更优的性能。而map操作向量化则将常见的dataset.map(f).batch(b)操作组合变换调整为dataset.batch(b).map(parallel_for(f))借助现代CPU的对并行操作更友好的SIMD指令集来加速数据预处理。

View File

@@ -0,0 +1,122 @@
## 易用性设计
本节我们主要介绍如何设计一个易用的机器学习系统数据模块。正如前文所言,易用性既要求数据模块提供好的编程抽象和接口使得用户可以方便的构建一个数据处理流水,同时还要支持用户灵活地在数据流水中注册使用自定义算子以满足丰富多变的特殊需求,接下来我们将从编程接口抽象和自定义算子注册机制两个方面来展开探讨。
### 编程抽象与接口
:numref:`image_process_pipeline` 我们展示的是一个训练图片分类模型的经典数据预处理流水线。我们从存储设备中加载数据集后对数据集中的图片数据进行解码、缩放、旋转、正规化、通道变换等一系列操作对数据集的标签也进行特定的预处理操作最终将处理好的数据发送到芯片上进行模型的计算。我们希望数据模块提供的编程抽象具备足够高的层次以使得用户可以通过短短几行代码就能描述清楚数据处理的逻辑不需要陷入过度的、重复的数据处理实现细节当中。同时又要确保这一套高层次的抽象具备足够通用性以满足多样的数据预处理需求。在我们得到一个好的编程抽象后我们将会以基于MindSpore的数据模块提供的编程接口实现下图所描述的数据预处理流水线的代码片段为例子来展示一个优秀的编程抽象对用户编程负担的减轻是有多么大的作用。
![数据预处理示例](../img/ch07/7.2/image_process_pipeline.png)
:width:`800px`
:label:`image_process_pipeline`
事实上,面向数据计算的编程抽象早已在通用数据并行计算系统领域中被广泛的研究并取得了相对统一的共识------那就是提供类LINQ式\[1\]的编程抽象其最大的特点是让用户专注于描述基于数据集的生成与变换而将这些操作的高效实现与调度执行交由数据系统的运行时负责。一些优秀的系统如Naiad\[2\],
Spark\[3\], DryadLINQ\[4\]等都采用了这种编程模型。我们以Spark为例子进行简要介绍。
Spark向用户提供了基于弹性分布式数据集(Resilient Distributed Dataset, RDD)概念的编程模型。一个RDD是一个只读的分布式数据集合用户通过Spark的编程接口来主要描述RDD的创建及变换过程我们以一个Spark示例进行展开讨论。下面展示了一段在一个日志文件中统计包含ERROR字段的行数的Spark代码我们首先通过文件读取创建一个分布式的数据集file前文提到RDD表示数据的集合这里的file实际上是日志行的数据集合
我们对这个file数据集进行filter(过滤)运算得到新的只保留包含ERROR字段的日志行的数据集errs接着我们对errs中的每一个数据进行map(映射)操作得到数据集ones最后我们对ones数据集进行reduce操作得到了我们最终想要的统计结果即file数据集中包含ERROR字段的日志行数。
```java
val file = spark.textFile("hdfs://...")
val errs = file.filter(_.contains("ERROR"))
val ones = errs.map(_ => 1)
val count = ones.reduce(_+_)
```
我们发现用户只需要四行代码就完成了在这样一个分布式的数据集中统计特定字段行数的复杂任务这得益于Spark核心的RDD编程抽象从 :numref:`rdd_transformation_example`的计算流程可视化中我们也可以清晰的看到用户在创建数据集后,只需要描述在数据集上的作用算子即可,至于算子的执行和实现则由系统的运行时负责。
![Spark编程核心------RDD变换](../img//ch07/7.2/RDD.png)
:width:`800px`
:label:`rdd_transformation_example`
主流机器学习系统中的数据模块同样也采用了类似的编程抽象如TensorFlow的数据模块tf.data\[5\],
以及MindSpore的数据模块MindData等。接下来我们以MindData的接口设计为例子来介绍如何面向机器学习这个场景设计好的编程抽象来帮助用户方便的构建模型训练中多种多样的数据处理流水线。
MindData是机器学习系统MindSpore的数据模块主要负责完成机器学习模型训练中的数据预处理任务MindData的向用户提供的核心编程抽象为基于Dataset数据集的变换处理。这里的Dataset是一个数据帧的概念(Data
Frame)即一个Dataset为一个多行多列且每一列都有列名的关系数据表。
![MindSpore
Dataset示例](../img/ch07/7.2/dataset_table.png)
:width:`800px`
:label:`mindspore dataset example`
基于这样一个编程模型结合我们在第一节中介绍的机器学习数据流程中的关键处理流程MindData为用户提供了对数据集进行shuffle、map、batch等变换操作的数据集操作算子这些算子接收一个Dataset作为输入并以一个新处理生成的Dataset作为结果输出我们列举典型的数据集变换接口如下
:MindSpore支持的数据集操作接口
| 数据集操作 | 含义解释 |
| -------------------- | -------------------------------------------------------- |
| batch | 将数据集中的多行数据项组成一个mini-batch |
| map | 对数据集中的每行数据进行变换操作 |
| shuffle | 随机打乱数据集中的数据行的顺序 |
| filter | 对数据集的数据行进行过滤操作,只保留通过过滤条件的数据行 |
| prefetch | 从存储介质中预取数据集 |
| project | 从Dataset数据表中选择一些列用于接下来的处理 |
| zip | 将多个数据集合并为一个数据集 |
| repeat | 多轮次训练中,重复整个数据流水多次。 |
| create_dict_iterator | 对数据集创建一个返回字典类型数据的迭代器。 |
| ... | ... |
上述描述了数据集的接口抽象而对数据集的具体操作实际上是由具体的数据算子函数定义。为了方便用户使用MindData对机器学习领域常见的数据类型及其常见数据处理需求都内置实现了丰富的数据算子库。针对视觉领域MindData提供了常见的如Decode(解码)、Resize缩放、RandomRotation随机旋转、Normalize(正规化)以及HWC2CHW通道转置等算子针对文本领域MindData提供了Ngram、NormalizeUTF8、BertTokenizer等算子针对语音领域MindData提供了TimeMasking时域掩盖、LowpassBiquad双二阶滤波器、ComplexNorm归一化等算子这些常用算子能覆盖用户的绝大部分需求。
除了支持灵活的Dataset变换针对数据集种类繁多、格式与组织各异的难题MindData还提供了灵活的Dataset创建主要分为如下三类
- 通过内置数据集直接创建MindData内置丰富的经典数据集如CelebADataset、Cifar10Dataset、CocoDataset、ImageFolderDataset、MnistDataset、VOCDataset等。如果用户需要使用这些常用数据集可通过一行代码即可实现数据集的开箱使用。同时MindData对这些数据集的加载进行了高效的实现以确保用户能够享受到最好的读取性能。
- 从MindRecord中加载创建MindRecord为MindData设计的一种高性能通用数据存储文件格式用户可将数据集转换为MindRecord后借助MindSpore的相关API进行高效的读取。
- 从Python类创建如果用户已经有自己数据集的Python读取类那么可以通过MindData的GeneratorDataset接口调用该Python类实现Dataset的创建这给用户提供了极大的自由度。
![MindSpore
Dataset多种生成方式](../img/ch07/7.2/dataset.png)
最后我们以一个基于MindData实现我们本节开篇所描述的数据处理流水线为例子来展示以Dataset为核心概念的数据编程抽象是多么的用户友好。我们只需要短短10余行代码即可完成我们所期望的复杂数据处理同时在整个过程中我们只专注于逻辑的描述而将算子的实现和算子执行流程交由数据模块负责这极大的减轻了用户的编程负担。
```python
import mindspore.dataset as ds
import mindspore.dataset.transforms.c_transforms as c_transforms
import mindspore.dataset.transforms.vision.c_transforms as vision
dataset_dir = "path/to/imagefolder_directory"
# create a dataset that reads all files in dataset_dir with 8 threads
dataset = ds.ImageFolderDatasetV2(dataset_dir, num_parallel_workers=8)
#create a list of transformations to be applied to the image data
transforms_list = [vision.Decode(),
vision.Resize((256, 256)),
vision.RandomRotation((0, 15)),
vision.Normalize((100, 115.0, 121.0), (71.0, 68.0, 70.0)),
vision.HWC2CHW()]
onehot_op = c_transforms.OneHot(num_classes)
# apply the transform to the dataset through dataset.map()
dataset = dataset.map(input_columns="image", operations=transforms_list)
dataset = dataset.map(input_columns="label", operations=onehot_op)
```
### 自定义算子支持
有了基于数据集变换的编程抽象、以及针对机器学习各种数据类型的丰富变换算子支持,我们可以覆盖用户绝大部分的数据处理需求。然而由于机器学习领域本身进展快速,新的数据处理需求不断涌现,可能会有用户想要使用的数据变换算子没有被数据模块覆盖支持到的情况发生。为此我们需要设计良好的用户自定义算子注册机制,使得用户可以方便在构建数据处理流水线时使用自定义的算子。
机器学习场景中用户的开发编程语言以Python为主所以我们可以认为用户的自定义算子更多情况下实际上是一个Python函数或者Python类。数据模块支持自定义算子的难度主要由数据模块对计算的调度实现方式有关系比如Pytorch的dataloader的计算调度主要在Python层面实现得益于Python语言的灵活性在dataloader的数据流水中插入自定义的算子相对来说比较容易而像TensorFlow的tf.data以及MindSpore的MindData的计算调度主要在C++层面实现这使得数据模块想要灵活的在数据流中插入用户定义的Python算子变得较为有挑战性。接下来我们以MindData中的算子自定义算子注册使用实现为例子展开讨论这部分内容。
![MindData的C层算子和Python层算子](../img/ch07/7.2/operation.png)
:width:`800px`
:label:`mindspore operator example`
MindData中的数据预处理算子可以分为C层算子以及Python层算子C层算子能提供较高的执行性能而Python层算子可以很方便借助丰富的第三方Python包进行开发。为了灵活地覆盖更多场景MindData支持用户使用Python开发自定义算子如果用户追求更高的性能MindData也支持用户将开发的C层算子编译后以插件的形式注册到MindSpore的数据处理中进行调用。
对于用户传入map、filter等数据集变换算子中的自定义数据处理算子MindData的Pipeline启动后会通过创建的Python运行时来执行。需要指出的是自定义的Python算子需要保证需要保一个或多个输入、输出均是numpy.ndarray类型。具体执行过程中当MindData的Pipeline的数据集变换中执行用户自定义的PyFunc算子时会将输入数据以numpy.ndarray的类型传递给用户的PyFunc自定义算子执行完毕后再以numpy.ndarray返回给MindData在此期间正在执行的数据集变换算子如map、filter等负责该PyFunc的运行时生命周期及异常判断。如果用户追求更高的性能MindData也支持用户自定义C算子。dataset-plugin仓插件仓\[10\]为MindData的算子插件仓囊括了为特定领域遥感医疗气象等量身制作的算子该仓承载MindData的插件能力扩展为用户编写MindData的新算子提供了便捷易用的入口用户通过编写算子、编译、安装插件步骤然后就可以在MindData
Pipeline的map操作中使用新开发的算子。
![MindSpore自定义算子注册](../img/ch07/7.2/dataset-plugin.png)
:width:`800px`
:label:`mindspore_user_defined_operator`

View File

@@ -0,0 +1,34 @@
## 引用
\[1\] Meijer, E., Beckman, B., & Bierman, G. (2006, June). Linq:
reconciling object, relations and xml in the. net framework. In
Proceedings of the 2006 ACM SIGMOD international conference on
Management of data (pp. 706-706).
\[2\] Murray, D. G., McSherry, F., Isaacs, R., Isard, M., Barham, P., &
Abadi, M. (2013, November). Naiad: a timely dataflow system. In
Proceedings of the Twenty-Fourth ACM Symposium on Operating Systems
Principles (pp. 439-455).
\[3\] Zaharia, M., Chowdhury, M., Franklin, M. J., Shenker, S., & Stoica,
I. (2010). Spark: Cluster computing with working sets. HotCloud,
10(10-10), 95.
\[4\] Fetterly, Y. Y. M. I. D., Budiu, M., Erlingsson, Ú., & Currey, P.
K. G. J. (2009). DryadLINQ: A system for general-purpose distributed
data-parallel computing using a high-level language. Proc. LSDS-IR, 8.
\[5\] Murray, D. G., Simsa, J., Klimovic, A., & Indyk, I. (2021). tf.
data: A Machine Learning Data Processing Framework. arXiv preprint
arXiv:2101.12127.
\[6\] Mohan, J., Phanishayee, A., Raniwala, A., & Chidambaram, V. (2020).
Analyzing and mitigating data stalls in DNN training. arXiv preprint
arXiv:2007.06775.
\[7\] https://docs.google.com/document/d/18CXhDb1ygxg-YXNBJNzfzZsDFosB5e6BfnXLlejd9l0/edit#.
\[8\] https://github.com/NVIDIA/DALI.
\[9\] https://docs.ray.io/en/latest/data/dataset.html.
\[10\] https://gitee.com/mindspore/dataset-plugin.

View File

@@ -0,0 +1,33 @@
## 概述
机器学习场景中的数据处理是一个典型的ETLExtract, Transform,
Load过程第一个阶段Extract需要从存储设备中加载数据集第二个阶段Transform完成对数据集的变换处理。虽然不同的机器学习系统在构建数据模块时采用了不同的技术方案但其核心都会包含数据加载、数据混洗、数据变换、数据mini-batch组装以及数据发送等关键组件。其中每个组件的功能介绍如下所示
- **数据加载组件(Load)**:负责从存储设备中加载读取数据集,需要同时考虑存储设备的多样性(如本地磁盘/内存远端磁盘和内存等和数据集格式的多样性如csv格式txt格式等。根据机器学习任务的特点AI框架也提出了统一的数据存储格式如谷歌TFRecord,
华为MindRecord等以提供更高性能的数据读取。
- **数据混洗组件(Shuffle)**:负责将输入数据的顺序按照用户指定方式随机打乱,以提升模型的鲁棒性。
- **数据变换组件(Map)**:负责完成数据的变换处理,内置面向各种数据类型的常见预处理算子,如图像中的尺寸缩放和翻转,音频中的随机加噪和变调、文本处理中的停词去除和随机遮盖(Mask)等。
- **数据组装组件(Batch)**:负责组装构造一个批次(mini-batch)的数据发送给训练/推理。
- **数据发送组件(Send)**负责将处理后的数据发送到GPU/华为昇腾Ascend等加速器中以进行后续的模型计算和更新。高性能的数据模块往往选择将数据向设备的搬运与加速器中的计算异步执行以提升整个训练的吞吐率。
![数据模块的核心组件](../img/ch07/7.1/pipeline.png)
:width:`800px`
:label:`pipeline`
实现上述的组件只是数据模块的基础,我们还要对如下方面进行重点设计:
#### 易用性
AI模型训练/推理过程中涉及到的数据处理非常灵活一方面不同的应用场景中数据集类型千差万别特点各异在加载数据集时数据模块要支持图像、文本、音频、视频等多种类型的特定存储格式还要支持内存、本地磁盘、分布式文件系统以及对象存储系统等多种存储设备类型模块需要对上述复杂情况下数据加载中的IO差异进行抽象统一减少用户的学习成本。另一方面不同的数据类型往往也有着不同的数据处理需求。现有常见机器学习任务中图像任务常常对图像进行缩放、翻转、模糊化等处理文本任务需要对文本进行切分、向量化等操作而语音任务需要对语音进行快速傅立叶变换、混响增强、变频等预处理。为帮助用户解决绝大部分场景下的数据处理需求数据模块需要支持足够丰富的面向各种类型的数据预处理算子。然而新的算法和数据处理需求在不断快速涌现我们需要支持用户在数据模块中方便的使用自定义处理算子以应对数据模块未覆盖到的场景达到灵活性和高效性的最佳平衡。
#### 高效性
由于GPU/华为昇腾Ascend等常见AI加速器主要面向Tensor数据类型计算并不具备通用的数据处理能力现有主流机器学习系统数据模块通常选择使用CPU进行数据流水线的执行。理想情况下在每个训练迭代步开始之前数据模块都需要将数据准备好、以减少加速器因为等待数据而阻塞的时间消耗。然而数据流水线中的数据加载和数据预处理常常面临着具有挑战性的I/O性能性能和CPU计算性能问题数据模块需要设计具备支持随机读取且具备高读取吞吐率的文件格式来解决数据读取瓶颈问题同时还需要设计合理的并行架构来高效的执行数据流水线以解决计算性能问题。为达到高性能的训练吞吐率主流机器学习系统均采用数据处理与模型计算进行异步执行以掩盖数据预处理的延迟。
#### 保序性
和常规的数据并行计算任务所不同的是,机器学习模型训练对数据输入顺序敏感。使用随机梯度下降算法训练模型时,通常在每一轮需要按照一种伪随机顺序向模型输入数据,并且在多轮训练(Epoch)中每一轮按照不同的随机顺序向模型输入数据。由于模型最终的参数对输入数据的顺序敏感,为了帮助用户更好的调试和确保不同次实验的可复现性,我们需要在系统中设计相应机制使得数据最终送入模型的顺序由数据混洗组件的数据输出顺序唯一确定,不会由于并行数据变换而带来最终数据模块的数据输出顺序不确定。我们将在后文中对于保序性的要求和具体实现细节展开探讨。

View File

@@ -0,0 +1,3 @@
## 章节总结
本章我们围绕着易用性、高效性和保序性三个维度展开研究如何设计实现机器学习系统中的数据预处理模块。在易用性维度我们重点探讨了数据模块的编程模型通过借鉴历史上优秀的并行数据处理系统的设计经验我们认为基于描述数据集变换的编程抽象较为适合作为数据模块的编程模型在具体的系统实现中我们不仅要在上述的编程模型的基础上提供足够多内置算子方便的用户的数据预处理编程同时还要考虑如何支持用户方便的使用自定义算子。在高效性方面我们从数据读取和计算、两个分别介绍了特殊文件格式设计和计算并行架构设计。我们也使用我们在前几章中学习到的模型计算图编译优化技术来优化用户的数据预处理计算图以进一步的达到更高的数据处理吞吐率。机器学习场景中模型对数据输入顺序敏感于是衍生出来保序性这一特殊性质我们在本章中对此进行了分析并通过MindSpore中的Connector的特殊约束实现来展示真实系统实现中如何确保保序性。最后我们也针对部分情况下单机CPU数据预处理性能的问题介绍了当前基于异构处理加速的纵向扩展方案和基于分布式数据预处理的横向扩展方案我们相信读者学习了本章后能够对机器学习系统中的数据模块有深刻的认知也对数据模块未来面临的挑战有所了解。