2018 年底,vivo AI 研究院为了解决统一高性能训练环境、大规模分布式训练、计算资源的高效利用调度等痛点,着手建设 AI 计算平台。经过两年的持续迭代,平台建设和落地取得了很大进展,成为 vivo AI 领域的核心基础平台。平台从当初服务深度学习训练为主,到现在演进成包含 VTraining、VServing、VContainer 三大模块,对外提供模型训练、模型推理和容器化能力。VTraining 是一站式的通用训练平台,支持多种框架的大规模分布式训练,并配备 PB 级别规模的分布式存储。现在 VTraining 已经有一定数量级的用户,来自人工智能、影像、互联网等多个部门;落地的业务众多,支撑着广告、语音、视觉、NLP 等核心业务模型训练的迭代。本文分享了 vivo AI 计算平台在 Kubernetes 集群进行弹性分布式训练的实践心得。
在谈及弹性分布式训练之前,熟悉大数据作业的同学也许对 MapReduce/Spark 中的弹性作业有所了解。MapReduce/Spark 支持可伸缩执行器(Scalable Executors)和测度执行(Speculative Execution)特性,使得大数据任务在运行过程中能够随集群作业的资源状况及节点的健康状态和运行性能来对任务 job 的分片进行弹性调度。类似于大数据任务中的弹性作业,弹性分布式训练旨在提供任务运行期动态调整任务资源的能力。弹性分布式训练可以解决以下几个痛点:
随着集群规模的扩大,集群中给定时刻出现任意一台机器故障的概率也在增加,当出现节点临时故障的问题,缺乏容错机制的训练任务往往只能自认倒霉、重头再来。在深度学习的训练中,根据训练数据的量级和算力规模大小,一个 epoch 有可能耗时几小时甚至一天,即便用户训练代码有一定的容错意识为训练提供了 checkpoint,出于性能考量,checkpoint 无法设置太频繁,用户只能接受最近训练结果丢失的事实,并承受重新进行任务调度的时间成本,模型的迭代周期有变长的风险。弹性任务在运行时解决容错问题,单节点的失败不会影响整体训练。
没有弹性能力的训练任务由于无法感知实时的集群资源状况,用户提交任务时只能事先确定好集群空闲资源,并根据这个信息来为任务分配固定算力。这意味着在集群资源出现空闲算力的情况下,如果没有新的任务提交,这些算力将一直闲置,即便用户的训练有利用更多算力提高模型训练效率的可能性,也需要用户自发重启任务进行重新调度。另一方面,由于一开始分配了固定算力,对于某些呈现多阶段不同资源使用 pattern 的任务来说,用户的任务有可能在某一环节长时间处于资源利用率低下的情况,这种情况由于不能回收资源,实际上空闲的资源也无法被再利用。弹性任务和调度系统配合能有效检测到这些潜在能被利用的资源,进而提升集群整体资源利用率。
在 k8s 集群管理过程中,经常需要对机器进行运维管理,将机器划分为不同的用途供不同的业务使用。这种场景在离在线混部实践中出现的非常频繁。如果变更了机器用途,在不撤掉原有已调度的离线任务的情况下,考虑到机器内空闲资源和系统稳定性等因素,机器将不能 100%地投入新的业务用途中。离线任务往往由于优先级不如在线任务,只能被折衷杀死。而弹性任务天然具有容错性,任务在不同用途的机器间进行漂移是支持的。
尽管在离线系统弹性任务的概念一直存在,可是由于各种原因在 k8s 上进行深度学习任务的弹性训练没有大规模应用开来。
MapReduce/Spark + Yarn 是大数据领域的标准方案,由于执行框架上原生支持,弹性任务对于用户来说就如同呼吸一样自然,用户只需要按照 MapReduce/Spark 规范编写程序就可以获得弹性能力。但深度学习领域框架百花齐放,而且大部分框架也不会预设用户将训练任务部署在何种执行环境以及何种调度系统上。在各个框架上实现弹性控制的模块,以及进行对应调度系统的适配来实现弹性训练,这一工作量将非常大。即便不同框架都推出了自己的弹性训练方案,平台层面要整合众多框架的方案也有很高的维护成本。我们希望能以一种维护成本相对较低的方式支持尽可能多个框架的弹性训练。
现代深度学习模型训练方式基本上都是基于数据并行的,Parameter Server 方式的数据并行在异步训练时由于训练算法的数学特性,天然具有容错性,任意 worker 失败只需要重启后加入训练即可,然而异步训练只有引入延迟补偿机制才能比较好地保持模型训练的收敛性和效果;另外,Tensorflow 关于 Parameter Server 训练方式的实现上,利用 tf.train.MonitoredTrainingSession 实际上也可以为同步或异步提供基于 checkpoint 的容错机制,只是这种方式中当 Chief Worker 节点被挂起,任务依然只能通过重启恢复;对于基于 RingAllReduce 的同步训练情况,框架层面能提供的错误容忍也很有限, 比如 Pytorch DDP 在不依赖 TorchElastic 情况下当 worker 挂起会导致 nccl hang。
考虑到要在所有框架上支持所有训练模式的容错训练非常困难,而且目前平台业务中深度学习模型更多采用的是 RingAllReduce 的训练方式, 我们在实践中暂时先支持 RingAllReduce 同步训练的容错。
在 k8s 集群中部署机器学习训练任务比较主流的方案是使用 kubeflow,平台在实践过程中经过调研,发现 kubeflow 中的 tf-operator 项目具有极高通用性和灵活性,能适配支持多种不同类型训练任务的生命周期管理,比如原生支持基于 Tensorflow 的 ParameterServer 的训练,简单适配还可以支持 Pytorch Distributed DataParallel 任务、Kaldi 任务、MPI 任务和 XGBoost 任务等,完全满足训练平台支撑多种业务多框架训练的要求。也正是这种前提条件下,训练平台重度依赖了 tf-operator 组件。
目前 tf-operator 社区暂时没有支持弹性训练任务的打算, 用户需要弹性训练就需要修改 tf-operator 的实现,或者在站在更高的维度实现 controller 尝试利用 tf-operator 的机制来支持任务算力的弹性调控。
事实上社区针对这个问题也进行过讨论(https://github.com/kubeflow/tf-operator/issues/708),从最终的结论来看 tf-operator 开发者也更倾向于从上层来建立弹性的机制。
出于可维护性和灵活性考虑,我们选择第二种方式。
tf-operator 由于从一开始就是为 Tensorflow 作业服务,在管理作业的生命周期时主要考量了 Tensorflow ParameterServer 的训练模式,pod 被划分为三种角色类型 Chief/Master、Parameter Server 以及 Worker。Chief/Master 一般是统筹整个作业健康状态的角色,比如 Ingraph Replication 中的 Client 以及 Tensorflow2.x 中的 ClusterCoordinator; Parameter Server 是专门用于存放参数分片的一组服务节点;Worker 是训练中负责计算并同步参数的节点。当 Chief/Master 角色不存在时甚至连 Parameter Server 也不存在时,Worker 中的 0 号节点将充当 ChiefWorker,对应 Between Graph Replication 中使用 MonitoredSession 指定 Worker0 为 Chief 或在 Worker0 启动 MPI-based RingAllReduce 训练的场景。
tf-operator 在管理过程中会根据训练模式中不同角色的约定及特性来决定作业的状态,当节点被挂起后根据节点角色的语义判断是否需要重新创建。
以下是 tf-operator 的基本工作流程图:
其中,syncTFJob 是 tf-operator 管理对象的核心逻辑实现,内部对不同 ReplicaType 的 Pod 进行检查并推导出任务状态,逻辑流程图如下:
根据上边的流程图可知,当作业中既有 Chief 又有 Worker 时,正常情况下 Chief 的状态决定了整个作业的状态,当有 Worker 被驱逐时 operator 会根据当前 TFJob 配置中的 Replicas 的数值来确认是否需要创建新 Pod。这意味着我们可以在上层通过设置 TFJob 的对应 ReplicaType 的 Replicas 数值以及调用 k8s api 驱逐 pod 来实现作业伸缩。
根据上边介绍 tf-operator 的工作原理,我们可以标准化弹性作业的角色,为每个弹性作业配备一个 Chief 来负责作业容错以及弹性算力感知的工作,并在 PS(如果有的话)或 Worker 上建立容错机制来实现弹性训练。基于这一前提,我们实现了一个提供 ScaleJob 接口的作业管理服务来接收来自其他组件的作业伸缩请求,底层利用 operator 本身维护 TFJob 状态并重建 Worker 的机制来实现作业的横向伸缩,甚至纵向伸缩。
以横向伸缩为例,当需要扩张 Worker 时,修改 TFJob 的 Worker Type 的 Replicas 至更大数值,tf-operator 便会自动创建 Worker Pod; 当需要收缩 Worker 时,修改 TFJob 的 Worker Type 的 Replicas 至更小数值, 并调用 Evict/Delete api 驱逐多余的 Pod。同时为了保证 gang-scheduler 的调度机制(如抢占逻辑)正常进行,以及考虑到 job 的 voluntary disruptions 策略,伸缩需要维护 PodDisruptionBudget 对象,将 minAvailable 调至合适数值。
具体操作流程图如下所示:
上边的 stable window 是一个训练 worker 状态变化开始的不稳定状态逐步到稳定状态的过渡时间窗口,包含了 worker 重新创建或销毁的时间。
实现的时候考虑到不同任务 effective training batch 对训练的影响,我们为 TFJob 对象指定 Annotation 来标明每个任务支持的 Worker 数量范围,调整 replicas 数量和 pdb minAvailable 数值时会考虑这个信息。
作业的垂直伸缩可以一种取巧的方式实现,即通过横向伸缩的方式使用新资源配比的 worker 逐台替换掉老的 worker,流程如下:
这个过程中通过更新 TFJob 中的 Worker 的 ResourceSpec,再删除老 worker 并由 tf-operator 重新创建具有最新资源配置的新 worker,视感上可以得到作业的 pod 资源垂直伸缩的效果。特别地,当原来作业资源只有一个时,考虑到深度学习任务中的参数在线同步,需要先带起一个新 worker 来获得参数的副本,再等垂直伸缩完成后删除这个备份的副本。
在平台的大部分训练业务中,深度学习的模型训练上主要采用了 horovod 来进行 RingAllReduce 训练,为此我们利用了 horovod 在 0.20 版本以上支持的 elastic 功能;另外语音识别业务由于技术选型上重度依赖 kaldi 框架,而 kaldi 的负载分发引擎不支持 k8s,开源社区也并没有支持的打算,我们自研了基于 k8s 的负载分发框架,并支持类似 Sun Grid Engine 体验的弹性伸缩。
horovod 0.20 之前的版本使用了 mpi collective 通信原语来实现整个 RingAllReduce 过程的 controller,0.20 之后的版本考虑到弹性算力感知的需要使用了 gloo 的通信库来实现 controller 以便能动态修改通信的 world_size 以及调整每个训练 Worker 的 rank。
horovod elastic 的工作原理如下图:
其中每个 Worker 在 driver 下发训练后都将进入 elastic 循环,用户训练代码也包裹其中,如下图:
一开始作业启动时,horovod driver(horovodrun 程序)从 HostDiscovery 接口或根据静态指定的 Static Host 选项中获取到可以用于本次作业的算力(k8s 中的 Pod),只要此时允许调度的 working slot 的数量大于允许数量,driver 就往对应 working slots 上分发训练程序;分发的 elastic 训练程序在对应的 slot 启动后便向 driver 汇报,并获取本次训练的 WorldSize 以及自己 slot 的 Rank 和 LocalRank 等, 这个过程称为集会(rendezvous)。
当训练节点需要被驱逐时,horovod 引入了三种容错机制,一是 driver 通过 HostDiscovery 检测到 Worker 数发生变化,通知各 Worker 发生了 HostUpdate;二是为 gloo operations 指定超时来为 controller 进行通信时进行容错; 三是通过在 horovod 的 elastic loop 中捕捉 Boardcast、AllReduce 及 AllGather 等异常。这样当错误检测发生时,所有 worker 会重回集会过程,等待算力满足条件后再重新开始训练;为了减少因为重新训练而重复进行的训练步数,horovod 提供了 api 允许用户在 elastic loop 中每隔若干个 steps 进行一次 commit 操作来保存梯度,并且当集会结束后进行一次 Boardcast 来同步梯度和超参数。
当有新的训练节点加入时,driver 从 HostDiscovery 接口探知到新增的 slots,给现存的 Rank0 的节点发送 HostUpdate 消息,Rank0 的训练程序接收到消息后将进行 horovod shutdown,导致所有的 worker 重新进入集会;同时 driver 往新增的 slots 下发训练程序,所有 slot 的训练程序完成集会及信息同步后会继续训练。
在节点驱逐场景,horovod elastic 原生实现存在一些缺陷,一是当节点失败后,无论是驱逐还是节点崩溃,都认为节点无法恢复,这在 k8s 集群中显然是不合理的,因为 tf-operator 在可能的情况下总是会重建同名的新 worker;二是在 GPU 训练场景,当指定 HOROVOD_GPU_OPERATIONS 为 nccl 时,nccl 由于通信过程在 cuda kernel 内部,peer 节点被驱逐时有可能导致算子中断引发 hang 或 coredump 现象,此时错误并不能被优雅捕捉到,导致整个作业失败。针对这两个问题,平台对官方的 horovod 进行了改造,第一个问题修改了 horovod driver 代码,为失败 blacklist 的节点增加了重试机制,并向官方提交了 pr(https://github.com/horovod/horovod/pull/2483); 问题二中,我们引入了一种基于 SigUSR1 信号的优雅退出机制,在 elastic loop 中捕捉该信号,并通知 horovod shutdown 来消耗已经入队的梯度操作,同时切断 gloo 通信来触发 gloo operations timeout 来通知其它 peer。
另外,horovod 的 HostDiscovery 接口实际上是通过执行脚本的形式来实现的,horovod 根据脚本的执行结果知道当前可以进行训练的 slot 信息。实践过程中,平台为弹性作业的 pod 添加 PreStart/PostStop 生命周期的钩子,当有新 pod 启动时往分布式存储写入一个 key,pod 销毁时注销这个 key。HostDiscovery 脚本在执行过程中动态检查 pod key 的集合,并根据 pod 的申请资源的情况决定可以执行训练的 slot 的数量。
语音识别的 kaldi 提特征、解码等 cpu 型任务,具有和大数据作业类似的属性,任务中一个 executor 重启不会影响整体,只要任务一开始切分好需要处理的数据,同一任务内不同的作业之间都是相对独立的。所以只要使用的作业分发框架支持对作业失败状态的捕捉就可以实现容错;另外,只要将数据切分得足够多份,当新增弹性算力时,负载分发框架感知到这种变化后可以将 pending 的作业调度到新增加的 slot 中。
负载分发框架对算力的感知同样也依赖我们对 pod 生命周期的钩子。
kaldi elastic 负载分发框架工作原理如下图:
可以看出,作业容器的资源会由 ResourceAgent 监听分布式存储中作业的 pod 对应的 key,统一管理并编排成 slots 的形式,当 kaldi driver 程序发送计算调度请求时会由 ResourceAgent 查找当前容器来空闲的 slot,并为执行进程绑定 slot 信息;当空闲 slot 不足时,会主动请求 controller 申请扩容;当 slot 长时间空闲时,会请求 controller 回收对应的 pod。
基于上述方案的弹性分布式训练,依然有一个漏洞,Chief 节点对作业来说属于 Critical 节点,当 Chief 节点被驱逐或节点失败时,作业会重启或失败。为了要保证 Chief 的可用性,我们可以为 Chief 节点指定 nodeSelector,指定专用的非弹性用途的节点用于调度;另外由于 Chief 节点负载非常低,这类专用节点一般是稳定的。
弹性分布式训练可以大大提高集群资源利用率以及资源配置的灵活性,vivo AI 计算平台建立了初步的弹性分布式训练机制,支持深度学习基于 RingAllReduce 的弹性训练和语音 Kaldi 识别任务的弹性作业。未来平台还将支持 ParameterServer 的弹性算力感知,逐步打通与如利用率监控和离在线混部资源管理等资源调控组件的关节,不断完善弹性分布式训练的基础设施。
vivo AI 计算平台 Kubernetes 集群 Ingress 网关实践
作者介绍:
林国泽,曾就职于商汤科技,目前是 vivo AI 研究院计算平台组的资深工程师,关注领域包括机器学习系统、高性能计算、云原生技术等。
领取专属 10元无门槛券
私享最新 技术干货