Uber开发了POET、Go-Explore和GTN等算法,这些算法利用大量的计算来训练神经网络模型。为了使未来几代类似算法的大规模计算成为可能,Uber进而开发了一种新的分布式计算库Fiber,它可以帮助用户轻松地将本地计算方法扩展到成百上千台机器上。Fiber可以使使用Python的大规模计算项目变得快速、简单和资源高效,从而简化ML模型训练过程,并获得更优的结果。
本文最初发布于Uber工程博客,由InfoQ中文站翻译并分享。
项目地址:https://github.com/uber/fiber
在过去的几年中,计算机不断增强的处理能力推动了机器学习的进步。算法越来越多地利用并行性,并依赖分布式训练来处理大量数据。然而,随之而来的是增加数据和训练的需求,这对管理和利用大规模计算资源的软件提出了巨大的挑战。
在Uber,我们开发了POET、Go-Explore和GTN等算法,这些算法利用大量的计算来训练神经网络模型。为了使未来几代类似算法的大规模计算成为可能,我们开发了一种新的分布式计算库Fiber,它可以帮助用户轻松地将本地计算方法扩展到成百上千台机器上。Fiber可以使使用Python的大规模计算项目变得快速、简单和资源高效,从而简化ML模型训练过程,并获得更优的结果。
在理想情况下,将运行在一台机器上的应用程序扩展为运行在一批机器上的应用程序应该很容易,只需更改命令行参数即可。然而,在现实世界中,这并不容易。
我们每天都与许多运行大规模分布式计算任务的人一起工作,我们发现,现在很难利用分布式计算的原因有以下几个:
新的Fiber平台专门解决了这些问题。它为更广泛的用户群体提供了无缝使用大规模分布式计算的可能。
Fiber是一个用于现代计算机集群的基于Python的分布式计算库。用户可以利用这个系统针对整个计算机集群进行编程,而不是只针对单个台式机或笔记本电脑。它最初是为了支持像POET这样的大规模并行科学计算项目而开发的,Uber也已经用它来支持类似的项目。Fiber的功能非常强大,这样主要是因为:
除了这些好处之外,Fiber还可以在特别关注性能的领域与其他专用框架搭配使用。例如,对于随机梯度下降(SGD),Fiber的Ring特性可以帮助我们在计算机集群上建立分布式训练作业,并允许它与Horovod或torch.distributed协同。
图1:Fiber启动许多不同的作业支持(job-backed)进程,然后在其中运行不同的Fiber组件和用户进程。Fiber Master是管理所有其他进程的主进程。有些进程(如Ring Node)保持成员之间的通信。
Fiber可以帮助从事大规模分布式计算的用户减少从产生想法到在计算集群上实际运行分布式作业的时间。它还可以帮助用户屏蔽配置和资源分配任务的繁琐细节,并且可以缩短调试周期,简化从本地开发到集群开发的转换。
Fiber让我们可以灵活地为经典的多处理API选择可以在不同集群管理系统上运行的后端。为了实现这种集成,Fiber被分为三个不同的层:API层、后端层和集群层。API层为Fiber提供了进程、队列、池和管理器等基本构建块。它们具有与多处理相同的语义,但是我们对它们进行扩展了,使它们可以在分布式环境中工作。后端层处理在不同集群管理器上创建或终止作业的任务。当用户新增一个后端时,所有其他Fiber组件(队列、池等)都不需要更改。最后,集群层由不同的集群管理器组成。尽管它们不是Fiber本身的一部分,但是它们帮助Fiber管理资源并跟踪不同的作业,减少了Fiber所需要跟踪的项的数量。图2是总体架构图:
图2:Fiber的架构包括API层、后端层和集群层,这让它可以在不同的集群管理系统上运行。
Fiber引入了一个新的概念,称为作业支持过程(也称为Fiber进程)。这些进程与Python多处理库中的进程类似,但是更灵活:多处理库中的进程只在本地机器上运行,但Fiber进程可以在不同的机器上远程运行,也可以在同一机器上本地运行。当新的Fiber进程启动时,Fiber会在当前计算机集群上创建一个具有适当Fiber后端的新作业。
图3:Fiber中的每个作业支持进程都是在计算机集群上运行的一个容器化作业。每个作业支持进程也有自己的CPU、GPU和其他计算资源。在容器内运行的代码是自包含的。
Fiber使用容器来封装当前进程的运行环境(如上图3所示),其中包括所有必需的文件、输入数据和其他依赖的程序包,而且要保证每个元素都是自包含的。所有子进程都以与父进程相同的容器镜像启动,以确保运行环境的一致性。因为每个进程都是一个集群作业,所以它的生命周期与集群上的任何作业相同。为了方便用户,Fiber被设计成直接与计算机集群管理器交互。因此,不像Apache Spark或ipyparallel,Fiber不需要在多台机器上设置,也不需要通过任何其他机制引导。它只需要作为一个普通的Python pip包安装在一台机器上。
Fiber基于Fiber进程实现了大多数多处理API,包括管道、队列、池和管理器。
Fiber中队列和管道的行为方式与多处理相同。不同之处在于,Fiber中的队列和管道由运行在不同机器上的多个进程共享。两个进程可以从同一个管道读取和写入数据。此外,队列可以在不同机器上的多个进程之间共享,每个进程可以同时向同一队列发送或从同一队列接收信息。Fiber队列是用高性能异步消息队列系统Nanomsg实现的。
图4:Fiber可以在不同的Fiber进程之间共享队列。在本例中,一个Fiber进程与队列位于同一台机器上,另外两个进程位于另一台机器上。一个进程写入队列,另外两个进程读取队列。
Fiber也支持池,如下图5所示。它们让用户可以管理工作进程池。Fiber使用作业支持进程扩展池,以便每个池可以管理数千个(远程)工作进程。用户还可以同时创建多个池。
图5:在具有三个工作进程的池中,如本例所示,两个工作进程位于一台机器上,另一个位于另一台机器上。它们共同处理提交到主进程中任务队列的任务,并将结果发送到结果队列。
管理器和代理对象使Fiber能够支持共享存储,这在分布式系统中至关重要。通常,这个功能由计算机集群外部存储系统如Cassandra和Redis提供。相反,Fiber为应用程序提供了内置的内存存储。该接口与多处理系统中的管理器类型接口相同。
Ring是对多处理API的扩展,可以用于分布式计算设置。在Fiber中,Ring指的是一组共同工作的、相对平等的进程。与池不同,Ring没有主进程和辅助进程的概念。Ring内的所有成员承担大致相同的责任。Fiber的Ring模型拓扑(如下图6所示)在机器学习分布式SGD中非常常见,torch.distributed和Horovod就是例子。一般来说,在一个计算机集群上启动这种工作负载是非常具有挑战性的;Fiber提供Ring特性就是为了帮助建立这样的拓扑。
图6:在一个有四个节点的Fiber Ring中,Ring节点0和Ring节点3运行在同一台机器上,但在两个不同的容器中。Ring节点1和节点2都在单独的机器上运行。所有这些进程共同运行同一函数的副本,并在运行期间相互通信。
借助上述灵活的组件,我们现在可以使用Fiber构建应用程序了。在这一节中,我们将展示两种使用Fiber帮助用户构建分布式应用程序的方式。
在下面的例子中,我们将展示工程师如何运用Fiber来实现大规模分布式计算。这个例子演示的是一个强化学习(RL)算法。通常,分布式RL的通信模式涉及在机器之间发送不同类型的数据,包括动作、神经网络参数、梯度、per-step/episode观察及奖励。
Fiber实现了管道和池来传输这些数据。在底层,池是普通的Unix套接字,为使用Fiber的应用程序提供接近线路速度的通信。现代计算机网络的带宽通常高达每秒几百千兆。通过网络传输少量数据通常速度很快。
此外,如果有许多不同的进程向一个进程发送数据,进程间通信延迟也不会增加太多,因为数据传输可以并行进行。事实证明,Fiber池可以作为许多RL算法的基础,因为模拟器可以在各个池工作进程中运行,并且结果可以并行回传。
下面的示例显示了使用Fiber实现的简化RL代码:
# fiber.BaseManager is a manager that runs remotely
class RemoteEnvManager(fiber.managers.AsyncManager):
pass
class Env(gym.env):
# gym env
pass
RemoteEnvManager.register(‘Env’, Env)
def build_model():
# create a new policy model
return model
def update_model(model, observations):
# update model with observed data
return new_model
def train():
model = build_model()
manager = RemoteEnvManager()
num_envs = 10
envs = [manager.Env() for i in range(num_envs)]
handles = [envs[i].reset() for i in num_envs]
obs = [handle.get() for handle in handles]
for i in range(1000):
actions = model(obs)
handles = [env.step() for action in actions]
obs = [handle.get() for handle in handles]
model = update_model(model, obs)
许多Python用户利用了多处理。Fiber为此类应用程序提供了更多的机会,通过这种系统,只需更改几行代码,就可以在类似于Kubernetes的计算机集群上的分布式设置中运行。
例如,OpenAI Baselines是一个非常流行的RL库,它有许多参考算法,比如DQN和PPO。它的缺点是只能在一台机器上工作。如果希望大规模地训练PPO,就必须创建自己的基于MPI的系统并手动设置集群。
相比之下,有了Fiber,事情就简单多了。它可以无缝地扩展像PPO这样的RL算法,从而利用分布式环境的数百个工作进程。Fiber提供了与多处理相同的API,OpenAI Baselines就是使用这些API在本地获取多核CPU的处理能力。要让OpenAI Baselines使用Fiber,只需要修改一行代码:
修改完这行代码,OpenAI Baselines就可以在Kubernetes上运行了。我们在这里提供了在Kubernetes上运行OpenAI Baselines的完整指南。
Fiber实现了基于池的错误处理。在创建新池时,还将创建关联的任务队列、结果队列和挂起表。然后,用户可以将新创建的任务添加到任务队列中。该任务队列由主进程和工作进程共享。每个工作进程从任务队列中获取一个任务,然后在该任务中运行任务函数。每当用户从任务队列中删除一个任务时,Fiber就会在挂起表中添加一个条目。工作进程完成该任务后会将结果放入结果队列中。然后,Fiber从挂起表中删除与该任务相关的条目。
图7:上图是一个包含四个工作进程的普通Fiber池。在下图,Worker 3出现故障,因此Fiber启动一个新的工作进程(Worker 5),然后准备将其添加到池中。
如果池里有一个工作进程在处理过程中失败,如上图7所示,父池作为所有工作进程的进程管理器将会检测到该失败。然后,如果这个失败的进程有挂起任务,则父池会将挂起表中的挂起任务放回到任务队列中。接下来,它启动一个新的工作进程来替换之前失败的进程,并将新创建的工作进程绑定到任务队列和结果队列。
Fiber最重要的应用之一是扩展计算算法(如RL)和基于群体的方法(如ES)。在这些应用程序中,延迟非常关键。RL和基于群体的方法经常应用于需要与模拟器(如ALE、Gym和Mujoco)频繁交互以评估策略和收集经验的设置中。等待模拟器结果所带来的延迟严重影响了整体的训练性能。
为了测试Fiber,我们将其性能与其他框架进行了比较。我们还在框架开销测试中增加了Ray,以提供一些初步结果,并希望在将来添加更详细的结果。
通常有两种方法可以减少RL算法和基于群体的方法的延迟。要么我们可以减少需要传输的数据量,要么我们可以提升不同进程之间通信通道的速度。为了加快通信处理,Fiber使用Nanomsg实现了管道和池。此外,用户还可以使用speedus这样的库进一步提高性能。
通常,框架中的组件会影响计算资源,因此,我们测试了Fiber的开销。我们比较了Fiber、Python多处理库、Apache Spark、Ray和ipyparallel。在测试过程中,我们创建了一批工作负载,完成这些任务所需的总时间是固定的。每个任务的持续时间从1秒到1毫秒不等。
对于每个框架,我们在本地运行了5个工作进程,并通过调整批次的大小来确保每个框架的总耗时大约为1秒(即1毫秒的任务,我们运行了5000个)。我们假设,Fiber的性能应该和多处理差不多,因为Fiber和多处理都不依赖于复杂的调度机制。相反,我们认为Apache Spark、Ray和ipyparallel会比Fiber慢,因为它们中间依赖于调度器。
图8:在测试Fiber、Python多处理库、Apache Spark和ipyprallel的框架开销时,我们在本地运行了5个工作进程,并调整批次大小,使每个框架在大约1秒钟内完成任务。
当任务持续时间为100毫秒或更多时,Fiber几乎没有表现出任何差异,当任务持续时间降至10毫秒或1毫秒时,它比其他框架更接近多处理库。
我们以多处理作为参考,因为它非常轻量级,除了创建新进程和并行运行任务外没有实现任何其他特性。此外,它还利用了仅在本地可用的通信机制(例如共享内存、Unix域套接字等)。这使得支持分布式资源管理系统的其他框架难以超越多处理,因为这些系统无法利用类似的机制。
图9:我们的开销测试显示,Fiber的执行情况与Python多处理库类似,在1毫秒处,ipyparallel和Apache Spark处理任务的耗时更长。最佳完成时间为1秒。
与Fiber相比,ipyparallel和Apache Spark在每个任务持续时间上都落后很多。当任务持续时间为1毫秒时,ipyparallel花费的时间几乎是Fiber的24倍,Apache Spark花费的时间是后者的38倍。显然,当任务持续时间较短时,ipyparallel和Apache Spark都引入了相当大的开销,而且,对于RL和基于群体的方法,它们不如Fiber合适,后者使用了模拟器,响应时间只有几毫秒。我们还可以看到,在运行1毫秒的任务时,Ray花费的时间大约是Fiber的2.5倍。
为了探究Fiber的可伸缩性和效率,我们将其与ipyparallel进行了比较,由于之前的性能测试结果,我们没有考虑Apache Spark。我们也排除了Python多处理库,因为它不能扩展到一台机器之外。我们运行了50次进化策略迭代(ES),根据它们的耗时对比了Fiber和ipyparallel的可伸缩性和效率。
在工作负载相同的情况下,我们预计Fiber可以完成得更快,因为前面已测试过,它的开销比ipyparallel小得多。对于Fiber和ipyparallel,我们使用的群体大小为2048,因此,无论工作进程的数量多少,总计算量都是固定的。我们还在两者中实现了相同的共享噪音表,每八个工作进程共享一个噪音表。这项工作的实验域是OpenAI Gym Bipedal Walker Hardcore环境的一个修改版本,这里对修改进行了描述。
图10:当ES迭代50次以上时,使用不同数量的工作进程运行ES,Fiber的扩展性均优于ipyparallel。每个工作进程在单个CPU上运行。
主要结果是,Fiber的扩展性比ipyparallel更好,并且完成每次测试的速度明显更快。随着工作进程数从32增加到1024,Fiber的运行时间逐渐缩短。相比之下,当工作进程数从从256增加到512时,ipyparallel的运行时间逐渐变长。在使用1024个工作进程时,由于进程之间的通信错误,ipyparallel未能完成运行。这个失败削弱了ipyparallel运行大规模并行计算的能力。根据Amdahl定律,我们看到,当工作进程数增加到512以上时,Fiber的收益会减少。在这种情况下,主进程处理数据的速度就会成为瓶颈。
总的来说,在所有工作进程数的测试中,Fiber的性能都超过了ipyparallel。此外,与ipyparallel不同的是,Fiber在运行1024个工作进程时也完成了这项工作。这个结果更能显示出Fiber与ipyparallel相比具有更好的可伸缩性。
Fiber是一个新的Python分布式库,现已开源。我们设计它是为了让用户能够在一个计算机集群上轻松地实现大规模计算。实验表明,Fiber实现了我们的许多目标,包括有效地利用大量的异构计算硬件,动态地伸缩算法以提高资源使用效率,以及减少在计算机集群上运行复杂算法所需的工程负担。
我们希望,Fiber将进一步加快解决工程难题的进展,使开发方法并大规模地运行以了解其好处变得更容易。要了解更多细节,请查看Fiber GitHub库。
查看英文原文:
领取专属 10元无门槛券
私享最新 技术干货