Files
openmlsys-zh/chapter_distributed_training/collective.md
Peiyuan Liao 3788ff67ad [内容补充与拓展]集合通信 (#334)
* add initial content on collective communication

* Update mlsys.bib

* update megatron-lm/dall-e citations

* [collective] basic definition

* Update collective.md

* [collective] Broadcast

* [collective] reduce

* [collective] Reduce, Allreduce, Gather, All Gather, Scatter, ReduceScatter

* [collective] reorganize op section

* Update collective.md

* [collective] format

* [collective] calculating bandwidth

* [collective] ZeRO

* [collective] ZeRO and DALL-E

* Update collective.md

* [collective] remove topology section

* [collective] ZeRO and DALL-E

* [collective] abstraction

* Update collective.md

* [collective] abstractions & allreduce to extension

* [collective] bandwidth calculation

* [collective] move comm interface to summary

* [collective] typo

* [collective] typo

* Update mlsys.bib

* Update references (#335)

* update ch03 (#338)

* update (#339)

Co-authored-by: Jiankai-Sun <jkaisun1@gmail.com>

* Fix ch10 figures (#341)

* fix #264

* Fix figures

* Add extended readings (fix #282)

* Remove extra spaces

* Fix typo

* fix #183

* update fonts in figures

* fix #184 #263

* fix #184 #263

* fix a bug

* fix a bug

* fix 183

* fix a bug

* fix a text

* Merge

* add overview figure fix #263

* fix #263

* fix the overview figure

Co-authored-by: Dalong <39682259+eedalong@users.noreply.github.com>

* Recsys fix (#340)

* fix text (#325)

* fix reference

* update images of explainable ai (#267) (#328)

* update explainable ai

* update explainable ai

* fix citation errors (#60)

* fix reference error

* update explainable ai

* update explainable ai

* fix citation errors (#60)

* fix reference error

* fetch upstream

* update explainable ai

* fix citation errors (#60)

* fix reference error

* update explainable ai

* remove redundant content

* update img of explainable AI(#267)

* fix bug in mlsys.bib

* fix bug2 in mlsys.bib

* rewrite mlsys.bib

Co-authored-by: lhy <hlicn@connect.ust.hk>
Co-authored-by: Dalong <39682259+eedalong@users.noreply.github.com>
Co-authored-by: Luo Mai <luo.mai.cs@gmail.com>

* 删除6.2.1小节标题中无效的图片路径 (#337)

6.2.1小节标题中的图片引用在下文出现了,删除该小节标题中无效的图片路径

Co-authored-by: Luo Mai <luo.mai.cs@gmail.com>
Co-authored-by: Cheng Lai <laicheng_VIP@163.com>

* add extension (#331)

Co-authored-by: Luo Mai <luo.mai.cs@gmail.com>

* add explainable extension (#343)

Co-authored-by: lixiaohui <lixiaohui33@huawei.com>
Co-authored-by: Luo Mai <luo.mai.cs@gmail.com>

* Update RL chapter (#349)

* fix chap12 render

* add distributed rl chapter

* fix bug

* fix issue #212

* fix typo

* update imgs

* fix chinese

* fix svg img

* update contents in rl chapter

* update marl sys

* fix a fig

* fix ref

* fix error

Co-authored-by: Dalong <39682259+eedalong@users.noreply.github.com>

* [collevtive] add references

* [collective] fix references & add equations

* [collective] fix reference and inline comments

* [collective] fix code

* Update collective.md

Co-authored-by: Cheng Lai <laicheng_VIP@163.com>
Co-authored-by: Jiarong Han <73918561+hanjr92@users.noreply.github.com>
Co-authored-by: Jack <sjkai1@126.com>
Co-authored-by: Jiankai-Sun <jkaisun1@gmail.com>
Co-authored-by: Yao Fu <fy38607203@163.com>
Co-authored-by: Dalong <39682259+eedalong@users.noreply.github.com>
Co-authored-by: HaoyangLI <417493727@qq.com>
Co-authored-by: lhy <hlicn@connect.ust.hk>
Co-authored-by: Luo Mai <luo.mai.cs@gmail.com>
Co-authored-by: theseed <feiyuxin1000@sina.com>
Co-authored-by: huygens12 <59854698+huygens12@users.noreply.github.com>
Co-authored-by: lixiaohui <lixiaohui33@huawei.com>
Co-authored-by: Zihan Ding <1402434478@qq.com>
2022-05-23 13:34:50 -04:00

32 KiB
Raw Blame History

集合通信

作为并行计算中的一个重要概念,集合通信算子经常会被用来构建单程序流/多数据流编程环境single program-multiple data, SPMD中的许多交互模式。近年来该领域无论是在对不同硬件架构的支持还是算法性能的发展上都成果颇丰而因SPMD在大型深度学习系统中与数据并行的深厚联系这些框架也在其中受益匪浅。因此相比点对点 (Point-to-Point, p2p) 通信我们有更大的兴趣去探讨如何高效地在数据中心Data Centers中实现这些集合通信范式。首先我们会介绍一些集合通信中常见的算子一个经典的利用All算法解决分布式训练系统中网络瓶颈的示例探讨该算法在不同网络拓扑结构下的差异性以及一些重要指标算法带宽总线带宽的计算方法最后简略介绍现有机器学习系统对不同集合通信算法的支持。

常见算子

在分布式内存模型Distributed Memory Model一些常见的进程间数据交互模式由硬件支持和并行算法的内在性质而涌现。因此主流的并行计算架构标准例如MPI和机器学习系统的底层集合通信库例如glooNCCL通常会支持数个经典的算子并针对其做优化一般包括BroadcastReduceAllGatherReduceScatter 和 AllReduce。在一个基于 :cite:Sanders2019-cq 的简化理论模型下,可以对这些算子的特性进行简单的介绍并探讨具体的实现方法和计算开销。

基本定义

首先假定一个简化后的分布式内存模型存在p个随机存取存储器Random Access Machines, RAM作为基础的处理单元Processing Element, PE),并由一个网络来连接所有的机器。每个处理单元有自己的独立内存,并且所有的处理单元间的通信都通过网络传输。同时,每个处理单元都知道自己的编号$i$,通常在$1$到$p$之间。 网络之间的通信在最底层的情况下均为点对点的全双工通信full-duplex point-to-point communication)

  • 每次通信有且仅有一个发送者sender和一个接收者receiver
  • 在某个特定时刻,每个处理单元仅能至多发送或接收一个信息。但是,在网络中可以同时传输多个信息。每个处理单元也可以在发送一个信息的同时接收一个信息。
  • 传输一个长度为l的信息会花费$a+bl$的时间,其中$a$代表延迟latency即单位信息通过网络从一个处理单元出发到达另一个处理单元所需的时间$b$代表传输延迟transmission delay即把单位信息从处理单元中放到网络通信单元所需的时间。前者的大小一般取决于两个处理单元间的物理距离同一个机架同一个数据中心横跨全球等而后者的大小一般取决于通信网络的带宽。在这个模型下假定所有处理单元之间的a和b均为恒定值。
  • 通信可以指定一个发送者或者一个接收者由于每个存储单元都有相对应的编号我们可以定义两个函数send(i,l) 和receive(i,l)。其中send函数会把信息l从当前的处理单元发送至编号为i的处理单元而receive函数会从编号为i的处理单元接收信息l。在调用send函数时处理单元必须同时调用receive来保证编号为i的处理单元收到了该信息。因此也可以说send和receive 同步synchronize了发送者和接收者。
  • 作为拓展我们也可以定义上述函数的一个变种i = send(m) 和 i = receive(m),即在传输信息时不规定发送者或接收者。这种情况下,网络中的任意一个处理单元都可以发送或接收该信息,而最终完成传输的处理单元的编号会作为函数的返回值。
  • 虽然在现实生活中错误fault时常发生但是在这个模型里暂不考虑通信丢失dropped message和通信毁坏corrupted message的情况。

分布式内存模型中对于通信同步和传输的结合使得在这个理论模型下开发的代码更好维护。额外的,由于这个框架下提出的算法往往会产生一些很有规律的,包含了网络中所有处理单元的交互模式,通常会在最基础的点对点通信上维护一个算子库,用来归纳总结这些高效且更易于理解的算法,我们将其称为集合通信算子。

Broadcast

在SPMD中最常见的一个交互模式经常是把一个位于处理单元i的信息发送到全部其他的节点用于同步某种全局的变量或者参数。为此Broadcast算子可以定义为从编号为$i$的处理单元发送长度为$l$的信息给全部剩余的$p-1$个处理单元。在这里,一种简单的方法是在一个循环中使用$p-1$次send/receive来实现Broadcast但这并不能很好地利用通信可并行化的特质该算法只有$(a+bl)(p-1)$的线性时间复杂度。为此我们可以利用分治思想divide-and-conquer来对上述算法进行优化。假设所有的处理单元可以重新对编号进行排列使得Broadcast的发送者为编号为$1$的处理单元。同时,为了简化计算过程,假设对于某个自然数$n$$p = 2^n$。 现在我们可以通过从1 向 p/2 发送一次信息来把问题转化为两个大小为$p/2$的子问题编号为1的处理单元对1到p/2-1 的Broadcast以及编号为$p/2$的处理单元对$p/2$到$p$的Broadcast。我们便可以通过在这两个子问题上进行递归来完成这个算法并把临界条件定义为编号为i的处理单元在$[i,i]$这个区间里的Broadcast。此时由于i本身已经拥有该信息我们不需要做任何操作便可直接完成Broadcast。这个优化后的算法有(a+bl)\log p 时间复杂度,因为在算法的每一阶段$t$,我们有$2^t$个计算单元在并行运行Broadcast算子。同时算法一定会在\log p 步之内结束。

Reduce

除了Broadcast另一个常见的交互模式为程序试图概述在部分处理单元上得到的中间值。这时候对于一个符合结合律associative property的算子$f$我们可以定义Reduce算子即将所有处理单元上的某个值两两配对重复应用该算子并把最终结果储存在编号为$i$的计算单元上。常见的应用于Reduce中的算子有加和乘积最大值最小值和平均值等。一个简易的Reduce的优化实现同样可以用分治思想来实现即把$1$到$p/2-1$的Reduce结果存到编号为$1$的处理单元中,然后把$p/2$到$p$的Reduce结果存到$p/2$上。最后,我们可以把$p/2$的结果发送至$1$,执行$f$,并把最后的结果存至$i$。假设$f$的运行时间复杂度为常数并不改变其输出信息的长度$l$Reduce的时间复杂度仍然为$(a+bl)\log p$。

AllReduce

AllReduce算子为Reduce的一个变种即将f的结果存至所有处理单元上。在这里我们给出一个简化版的AllReduce 实现方式即首先把最终值通过Reduce存到编号为$1$的处理单元再将该值通过Broadcast广播到所有的处理单元上。在两个子算子都使用上述的算法情况下AllReduce的时间复杂度仍为(a+bl)\log p。

Gather

Gather算子尝试将每个处理单元上的信息全部聚合到编号为$i$的处理单元上通常用于组装散落在每个处理单元上的独立信息。在聚合函数符合结合律的情况下可以通过将其设为Reduce算子中的$f$来实现Gather算子。但是在这种情况下无论是基于链表还是数组的实现在每一步的Reduce子问题中$f$的时间复杂度或输出长度$l$都发生了改变。因此Gather并不具有先前Reduce或者Broadcast的时间复杂度而是$a \log p + (p-1) bl$。这是因为在算法的每一阶段t我们传输的信息长度为$l 2^t$。

AllGather

相比起GatherAllGather 算子会把聚合的结果存到所有的处理单元上。在这里一个简单的做法是使用Gather和Broadcast把聚合结果先存到编号为1的处理单元中再将其广播到剩余的处理单元上。这会产生一个$a \log p + (p-1) bl + (a+plb) \log p$的时间复杂度因为在Broadcast时如果忽略链表/数组实现所带来的额外空间开销,每次通信的长度为$pl$而不是$l$。简化后,我们得到了一个a \log p + plb \log p 的时间复杂度。在一个基于超立方体的算法下我们可以将其进一步优化到和Gather一样的a \log p + (p-1) bl :cite:Sanders2019-cq),然而由于篇幅问题便不再赘述。

Scatter

Scatter算子可以被视作Gather的逆运算把一个存在于编号为$i$的处理单元上,长度为$p$(信息长度为$pl$的链式数据结构L中的值分散到每个处理单元上使得编号为i的处理单元会得到$L[i]$。我们可以通过模仿Gather算法来设计一个简易的Scatter实现每一步的运算中与其是聚集一半处理单元的结果我们把现在的子链继续对半切分并把前半段和后半段作为子问题进行递归。这时候在算法的每一阶段$t$,我们传输的信息长度为$l 2^(m-t)$其中m是算法总共运行的步骤不会超过\log p 见Broadcast。最终Scatter算子的检疫实现和Gather一样都有a \log p + (p-1) bl 时间复杂度。在机器学习系统中相比于链式数据结构Scatter经常同时被用于可切分的数据结构例如张量tensor在一个维度上的p等分等。

ReduceScatter

ReduceScatter算子可以视为Reduce 和 Scatter算子的组合体即对于每个处理单元上分别拥有的一个链式/可切分数据结构在通过f 概述后再重新分散到各个单元中。虽然我们已经知道了Reduce 和Scatter 各自的时间复杂度但是在对ReduceScatter做时间复杂度分析时需要注意两部之间信息长度的变化假设每个处理单元上的数据结构所需通信长度为$pl$第一阶段的Reduce算法需要(a+plb)\log p 时间复杂度。参照Scatter的分析第二阶段的算子则需要 a \log p + (p-1) bl 时间复杂度。综合下来ReduceScatter 需要 a \log p + plb \log p 的时间复杂度和AllGather相同。同时运行ReduceScatter 和 AllGather的效果等同于运行一次AllReduce。

在SPMD中通常还有一些额外的集合通信算子如Prefix SumBarrierAll-to-All等但由于篇幅限制以及与机器学习系统的有限联系便不再赘述。最后由于该模型下通信网络的拓扑结构较为简单上文中呈现二叉树形的递归树也可以达到很好的实际运行速度。所有关于时间复杂度的分析也是基于这些相对简化的假设情况。后文中我们将会用AllReduce举例介绍如何在更复杂的拓扑结构下设计不同的集合通信算子变种并在时间复杂度之外去关注实际的通信量和运算时间。

在数据中心的梯度计算

接下来,我们将用一个示例来阐释集合通信在机器学习系统中发挥的重要作用。

数据中心 :width:800px 🏷️ch10-datacentre

:numref:ch10-datacentre 描述了一个典型的用于深度学习模型训练的数据中心。数据中心中的训练服务器一般会有多个设备。如需增加服务器我们会将多个训练服务器放置在一个机柜Rack同时接入一个架顶交换机Top of Rack Switch将其连接。在现有机柜满载的情况下可以通过在架顶交换机间增加骨干交换机Spine Switch来接入新的机柜。通过这种方式可以在数据中心内不断增加服务器从而为神经网络的训练提供海量的算力和内存。目前的商用数据中心可拥有近百万台服务器。

在数据中心中训练大型神经网络的首要挑战是如何高效计算大量的平均梯度。假设给定一个千亿级别参数的神经网络比如OpenAI 发布的大型语言模型GPT-3 :cite:gpt-3 有将近1750亿参数如果用32位浮点数来表达每一个参数那么每一步训练中一个数据并行模式下的模型副本Model Replica则需要生成700GB的本地梯度数据即 175G \times 4 bytes = 700GB。假如有3个模型副本那么至少需要传输1.4TB700GB \times $(3-1)$)的本地梯度数据(因为对于$N$个副本,只需传送其中的$N-1$个副本来完成计算。当平均梯度计算完成后需要进一步将其广播Broadcast到全部的模型副本即1.4TB的数据并更新其中的本地参数从而确保模型副本不会偏离Diverge主模型中的参数。

当前的数据中心一般使用以太网Ethernet构建不同机柜之间的网络。主流的商用以太网链路带宽一般在10Gbps到25Gbps之间。利用以太网传输海量梯度会产生严重的传输延迟从而降低模型训练的速度。新型深度学习训练集群如英伟达的DGX系列机器往往配置有更快的Inifiband。单个InfiniBand链路可以提供100Gbps或200Gbps的带宽。即使拥有这种高速网络传输TB级别的本地梯度依然需要大量延迟即使忽略网络延迟1TB的数据在200Gbps的链路上传输也需要至少40秒

为了避免通过机间网络传输数据现代深度学习服务器一般都会配备多个加速器例如说英伟达的DGX-3服务器会配备8个A100 GPU而在一个服务器内的多个设备可以通过高速机内网络互联如NVLink。这种高速机内网络可以提供高达400GBps的带宽从而让传输TB级别的数据成为可能。然而受限于单个服务器的散热成本和硬件等限制通常无法在一个服务器内无限制的持续增加设备。因此大型深度学习模型的训练仍需要多个服务器共同完成。在计算平均梯度时服务器需要同时借助机间网络通信接口以太网或InfiniBand和机内通信接口NVLink

基于AllReduce的梯度平均算法

我们将讨论如何利用AllReduce算子来实现数据中心中的高效梯度平均。首先参照前文的分析可以考虑一种简单的计算平均梯度的方法在集群中分配一个设备来收集本地梯度并在计算平均梯度后再将其广播到全部的设备。这种做法易于实现但是引入了两个问题。首先多台设备同时给该聚合设备发送数据时聚合设备会因严重的带宽不足产生网络拥塞。其次单台设备需要负担大量的梯度平均计算而受限于单台设备上的有限算力这种计算往往会受限于算力瓶颈。

AllReduce初始状态和终止状态 :width:800px 🏷️ch10-AllReduce-state

为了解决上述问题可以引入AllReduce算子的Reduce-Broadcast实现来优化算法其设计思路是通过让全部的节点参与到梯度的网络通信和平均计算中将巨大的网络和算力开销均摊给全部节点。这种做法可以解决先前单个梯度聚合节点的问题。假设有$M$个设备,每个设备存有一个模型副本,该模型由$N$个参数/梯度构成。那么按照AllReduce算子的要求需要先将全部的参数按照设备数量切分成$M$个分区Partition使得每个分区具有$N/M$个参数。我们首先给出这个算法的初始和终止状态。如 :numref:ch10-AllReduce-state 所示该例子含有3个设备。在每个设备有一个模型副本的情况下这个副本有3个参数。那么按照AllReduce的分区方法参数会被划分成3个分区3个设备而每一个分区则有1个参数$N/M$N代表3个参数M代表3个设备。在这个例子中假定设备1拥有参数2,4,6设备2拥有参数1,2,3设备3拥有参数4,8,12那么在使用AllReduce算子进行计算过后全部的设备都将拥有梯度相加后的结果7,14,21其中分区1的结果7是由3个设备中分区1的初始结果相加而成7 = 1 + 2 + 4。为了计算平均梯度每个设备只需要在最后将梯度之和除以设备数量即可分区1的最终结果为7除以3

AllReduce算法的过程 :width:800px 🏷️ch10-AllReduce-process

AllReduce算子会把梯度的计算拆分成$M-1$个Reduce算子和$M-1$个Broadcast算子其中$M$是节点的数量。其中Reduce算子用于计算出梯度的和SummationBroadcast算子用于把梯度之和广播给全部的节点。为了说明这些算子的执行过程可以参照 :numref:ch10-AllReduce-process 。AllReduce算子由Reduce算子开始在第一个Reduce算子中AllReduce算子会对全部节点进行配对Pairing让他们共同完成梯度相加的操作。在 :numref:ch10-AllReduce-process 的第一个Reduce算子中设备1和设备2进行了配对共同对分区1的数据相加。其中设备2把本地的梯度数据1发送给设备1设备将接收到1和本地的分区1内的梯度数据2进行相加计算出中间intermediate梯度相加的结果3。与此同时设备1和设备3进行配对共同完成对分区3的数据相加。而设备3和设备2进行配对共同完成对于分区2的数据相加。

在上述Reduce的算子中梯度的计算实现了以下几个特性:

  • 网络优化: 全部设备都同时在接收和发送数据利用起了每个设备的入口Ingress和出口Egress带宽。因此AllReduce过程中可利用的带宽是$M \times B$,其中$M$是节点数量,$B$是节点带宽,从而让系统实现网络带宽上的可扩展性。

  • 算力优化: 全部设备的处理器都参与了梯度相加的计算。因此AllReduce过程中可利用的处理器是$M \times P$,其中$M$是节点数量,$P$是处理器数量,从而让系统实现计算上的可扩展性。

  • 负载均衡: 由于数据分区是平均划分的,因此每次设备分摊到的通讯和计算开销是相等的。

在接下来的Reduce算子中AllReduce算法会对不同数据分区选择另外的配对方法。例如说在 :numref:ch10-AllReduce-process 的第二个Reduce算子中AllReduce算法会将设备1和设备3进行配对负责分区1的数据相加。将设备1和设备2进行配对负责分区2。将设备2和设备3进行配对负责分区3。在一个3个节点的AllReduce集群里在2个Reduce算子完成后我们就计算出了每个分区的数据相加结果分区1的结果7此时在设备3上分区2的结果14此时在设备1上分区3的结果21此时在设备2上

接下来AllReduce算法将进入Broadcast阶段。这一阶段的过程和Reduce算子类似核心区别是节点进行配对后他们不再进行数据相加而是将Reduce的计算结果进行广播。在 :numref:ch10-AllReduce-process 中的第一个Broadcast算子中设备1会将分区2的结果14直接写入设备3的分区2中。设备2会讲分区3的结果21直接写入设备1中。设备3会将分区1的结果直接写入设备2中。在一个3个节点的AllReduce集群中我们会重复2次Broadcast算子来将每个分区的Reduce结果告知全部的节点。

带宽计算

在讨论集合通信算子的性能时人们经常会使用一些数值化指标去量化不同的算法实现其中一个重要概念为带宽Bandwidth。在文献:cite:nvidia-nccl通常有两种主流的对带宽的计算方法分别为算法带宽Algorithm Bandwidth与总线带宽Bus Bandwidth

算法带宽

前文提到在计算点对点通信所需的时间是会在信息长度之上乘以一个系数b。这个系数就是算法带宽泛指单位时间内执行操作通信计算等的数量。一般计算公式为$b = s/t$,其中$s$代指操作的大小,$t$指操作指定的两个端点之间所经过的时间。以点到点通信举例,我们可以通过衡量一个大小已知的信息$m$在执行send函数时所花的时间来确定两个处理单元之间网络的带宽。

总线带宽

虽然算法带宽的计算方法既简单又高效但很难将其拓展至对于集合通信算子的带宽计算。这是因为取决于具体算子和算法实现的不同一个集合通信算子在执行过程中测得的算法带宽往往会远小于硬件本身的最高带宽。在实际运行相应的测试中经常能观测到随着处理单元增加算法带宽呈下降趋势。为了解决这一问题NCCL提出了总线带宽这一概念通过对于每个集合通信算子的分析来对测得的算法带宽乘以一个校正系数correction factor来减轻处理单元数量对于测量带宽的影响并给出一个更贴近实际硬件表现的带宽值。下面列出了一些常见算子的校正系数以及背后的简略推导。

  • AllReduce2(p-1)/p 对于在处理单元n_1, n_2 \cdots n_p 上的值 v_1, v_2 \cdots v_p 计算 $v_1 (op) v_2 \cdots (op) v_p$(其中$op$为符合结合律的算子),再存回每个处理单元中。在不考虑实际实现算法和网络拓扑的情况下,这个操作理论上只需要 2(p-1) 次数据传输,其中包含在每个处理单元上分开进行的 n-1 次 op的运算以及最后 n 次最终数据值的广播,再减去第一个处理单元的运算和最后一个处理单元的广播的影响。假设每个处理单元对于外界所有信息处理的带宽为$B$我们可以得出对于S个在不同处理单元上的数据运行AllReduce是能得到的最优情况下的运行时间$t = (2S(p-1)) / (pB)$,进行简化后可得 $B = (S/t)(2(p-1)/p) = b (2(p-1)/p)$。这里的 $2(p-1)/p$便是我们的校正系数。
  • ReduceScatter(p-1)/p 对于每个处理单元来说可以把ReduceScatter理解为只执行AllReduce中的聚合部分。对此我们只需要考虑上文分析中的$n-1$次$op$的运算,整理后可得$B = (S/t)((p-1)/p) = b ((p-1)/p)$。
  • AllGather(p-1)/p 同理对于每个处理单元来说可以把AllGather理解为只执行AllReduce中的广播部分。我们同理可得$B = (S/t)((p-1)/p) = b ((p-1)/p)$。
  • Broadcast1 与AllReduce不同的是Broadcast中所有数据需要从算子本身的发送者发出。即使在上文的分治情况下我们也需要等待所有子问题运行结束才能确保Broadcast算子本身的正确性。因此在计算带宽时瓶颈仍为发送者对于外界所有信息处理的带宽所以 $B = S/t$,即校正系数为$1$。
  • Reduce1 同BroadcastReduce需要将所有数据送往算子的接收者因此校正系数同样为$1$。

由于Gather和Scatter的带宽计算与实际聚合/分散时的数据结构相关性更高,故不给出特定的校正系数。

样例分析

针对不同的集群性质现代机器学习系统往往会灵活应用不同集合通信算子的组合来最大化通信效率。这里我们提供了两个具体的案例分析分别为微软的ZeRO 以及 OpenAI 的 DALL—E。

ZeRO

ZeRO :cite:rajbhandari2020zero是微软提出的神经网络优化器可用于训练千亿级参数的神经网络也在实践中成功训练了当时世界上最大的语言模型为高达170亿参数的transformer。在训练这个级别的神经网络时主要遇到的问题是巨量参数对于加速器内存的占用其中包括优化器本身的参数反向传播时的梯度以及模型参数本身。通过简易的计算不难得出170亿参数的模型在32位浮点表示情况下会占用至少680GB的内存远超于现在内存最高的深度学习加速器A100 最高内存80GB。于是我们需要考虑如何高效的把模型切成数份存储在不同的加速器上以及如何高效的通过使用集合通信算子来进行模型训练和推理。ZeRO对此提出了多个优化方法这里例举了三个典型的例子

  1. 首先可以发现在现代集群中节点内部加速器的带宽往往比节点之间的带宽要大很多。这在某种程度上偏离了上文中的理论框架。为此我们需要尽量减少节点间的通信尽量保证大部分通信仅存在于节点内部的加速器之间。在观察模型切分时不难看出模型本身前馈和反向传播时需要大量的在不同切片之间通信相比下来不同模型拷贝之间的梯度聚合反而具有相对较少的通信量。针对这一特性ZeRO选择了将单一模型的全部切片存储到同一节点内部从而大大提高了训练效率。
  2. 进一步地假设模型中的参数在层的细粒度上呈线性便可将其从前到后分别存储到不同加速其中。在前馈时可以注意到某一层的计算仅依赖于其相邻层的参数。对此与其是手动设计点到点通信我们可以对所有包含模型参数的加速器进行一次AllGather计算用来提取每一层之后一层的参数以及计算该层本身的激活值。为了节约内存我们在AllGather结束后立即丢弃除了该层以外其他层的参数。
  3. 同理在反向传播时我们只需要前一层的参数来计算本层的激活值和梯度因此我们只需要再次使用AllGather来完成每个加速器上的梯度计算。同时我们注意到在聚集梯度后对于每个加速器我们仅需要在内存中的层数的梯度。对此我们可以使用ReduceScatter算子来在平均后直接把相应的梯度存到编号为i的加速器上而不是通常情况下的AllReduce。

DALL-E

DALL-E :cite:ramesh2021zero是OpenAI提出的一个基于文字的图片生成模型模型同样拥有高达120亿参数。在训练时除了运用到ZeRO所使用的AllGather + ReduceScatter 技巧OpenAI团队在细节上做了进一步的优化以达到更快的训练速度。这里我们简略介绍以下和集合通信相关的两点

  1. 我们注意到集合通信算子的运行速度和通信本身的长度正相关。在模型训练中这代表了模型参数本身的大小。对此DALL-E 选择用矩阵分解matrix factorization的方法先把高维张量调整为一个二维矩阵通过分解后分开用集合通信算子进行传输从而大大减少了通信量。
  2. 另一个减少通信量的方法在于数据类型本身。一个显然的做法是使用16位的半精度浮点数相比正常的32位参数表示可以节省近一倍的通信量。但是在实践中发现低精度的数据类型会使得模型收敛不稳定往往导致最终训练效果大打折扣。为此OpenAI分析了DALL—E 的模型结构并把其中的参数根据对数据类型精度的敏感性分为了多个类。其中对精度最敏感的一类照常使用32位浮点表示并只通过AllReduce来同步而最不敏感的参数则照常通过矩阵分解进行压缩和传输。对于比较敏感的一类例如Adam 优化其中的动能moments和方差variance参数OpenAI 基于 IEEE 754 标准实现了两个全新的数据类型1-6-9和0-6-10其中第一表示正负所需的位数第二表示指数所需的位数第三表示有效数字所需的位数在节省空间和保持收敛性能之间找到了一个平衡。

集合通信与机器学习系统

最后,集合通信已经被深度集成到了整个机器学习系统之中,以至于一些在库级别以上的开发者很难意识到系统在训练和推理时的一些步骤是由底层逻辑实现的。 一般来说不同的机器学习系统对于集合通信一般提供了两个级别的抽象分别是更与硬件耦合的可以直接调用集合通信算子的库和更偏向神经网络实现的通过内部调用集合通信算子来实现分布式训练和推理的深度学习框架。作为算法工程师通常会接触到后者的抽象包括Horovod, KungFu, TensorFlow distributed等而作为集群的维护者往往需要深入了解前者的运行原理和具体的调试方法。以深度学习框架 PyTorch 举例在torch.distributed 命名空间namespace下实现了一系列方便开发者使用的分布式模型训练和推理函数。在其内部会根据实际运行的集群调用更底层的集合通信算子库例如MPINCCL前文中已有介绍适用于GPU分布式训练gloo适用于CPU分布式训练等。我们来具体对比PyTorch distributed 中对于AllReduce 的应用和 NCCL 的差异性:下面两段代码中,前者(:cite:li2022ddp通过PyTorch自带的分布式数据并行Distributed Data Parallel方法完成了一次简易的深度学习模型计算后者则通过gloo的Python 接口pygloo和Ray:cite:moritz2018ray完成了一个二维张量的AllReduce计算。

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

def demo_basic(rank, world_size):
    setup(rank, world_size)

    model = ToyModel().to(rank)
    # 通过调用DDP将模型在每个处理器上完成初始化
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)

    # 在反向传播时框架内部会执行AllReduce算法
    loss_fn(outputs, labels).backward()
    optimizer.step()

def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    run_demo(demo_basic, n_gpus)
import os
import ray
import pygloo
import numpy as np
import multiprocessing

@ray.remote(num_cpus=1)
def test_allreduce(rank, world_size, fileStore_path):
    context = pygloo.rendezvous.Context(rank, world_size)
    attr = pygloo.transport.tcp.attr("localhost")
    dev = pygloo.transport.tcp.CreateDevice(attr)
    fileStore = pygloo.rendezvous.FileStore(fileStore_path)
    store = pygloo.rendezvous.PrefixStore(str(world_size), fileStore)

    context.connectFullMesh(store, dev)

    sendbuf = np.array([[1,2,3],[1,2,3]], dtype=np.float32)
    recvbuf = np.zeros_like(sendbuf, dtype=np.float32)
    sendptr = sendbuf.ctypes.data
    recvptr = recvbuf.ctypes.data

    # 标明发送者和者并直接调用AllReduce
    pygloo.allreduce(context, sendptr, recvptr,
                    sendbuf.size, pygloo.glooDataType_t.glooFloat32,
                    pygloo.ReduceOp.SUM, pygloo.allreduceAlgorithm.RING)

if __name__ == "__main__":
    ray.init()
    world_size = multiprocessing.cpu_count()
    fileStore_path = f"{ray.worker._global_node.get_session_dir_path()}" + "/collective/gloo/rendezvous"
    os.makedirs(fileStore_path)
    ray.get([test_allreduce.remote(rank, world_size, fileStore_path) for rank in range(world_size)])

可以注意到前者并没有显式的调用集合通信算子而是通过DistributedDataParallel将分布式训练和正常训练之间的不同隐藏了起来。如果我们需要在不同集群上运行这段代码只需要在setup 函数内相对的更改PyTorch使用的底层集合通信库即可。在backward函数被调用时才会真正的使用AllReduce算法。相比下来如果想要直接使用gloo不仅需要使用一步一步的创建通信所需要的数据结构同时也很难和现有的模型训练框架无缝连接。