互联网应用通常会产生大量的时间日志需要进行分析和处理。本文介绍Ubiq的架构,它是一个分布式系统,用于处理不断增长的日志文件,具有可扩展性、高可用、低延迟的特性。Ubiq框架容忍基础设施退化和数据中心级别的中断问题,无需人工干预。并且它支持exactly-once语义以将日志作为事件的集合进行处理。Ubiq已经应用于Google的广告系统多年,生产环境证明了机器资源的线性可扩展性,以及基础设置故障的情况下的高可用性和一分钟内的端到端的延迟。
当今的大多数互联网应用都是以数据为中心的:有底层的数据库基础设施驱动,交付产品给用户。同时,用户和这些应用的交互产生大量的数据需要被处理、分析产出详细的报告以增强用户体验和提升盈利能力。所有的应用几乎都是通过网络访问的,用户可以在任何地方任何时刻进行访问。这种无处不在的访问方式的结果是产生连续的数据,称为data stream(数据流)。在应用的上下文中,数据流是一系列事件,有效的表示用户与应用的交互历史。数据被存储在大量的日志文件中,统称为输入日志。日志捕获了大量的信息,这些信息可以进行分析来获得更高层次的信息,并深入了解应用的特性。通常,这些分析需要复杂的应用逻辑,包括对数据进行joining、aggregation、summarization。大多数互联网应用都需要后后端的基础设施来处理不断添加到输入日志中的数据。此外,这种处理应该是可以自动伸缩、可抵御失败的,并且应该提供良好的一致性语义。
Ubiq的目标是提供一个易于集成的日志处理框架,使开发者不需要关心底层基础设施的伸缩性、容错性、延迟、一致性等问题。Ubiq期望输入日志在分布于全球多个区域的数据中心都提供可用性。相同和不可变的输入日志确保系统能承受数据中心计划内和计划外的中断。Ubiq在多个数据中心处理输入日志,因此是多宿主的(multi-homed):processing pipeline并行的运行在多个数据中心,以生成具有多个副本的全局同步输出流。
尽管经常有人认为数据中心的故障是罕见的,在体系结构上处理他们是多余的,但是在Google的规模下,这种故障确实会发生。我们遇到数据中心中断的原因有两个:1. 因为外部因素(电源故障、光纤中断等)造成的部分或全部停机;2. 计划维护而造成的停机。计划内的停机可以通过将系统从一个数据中心迁往另一个数据中心解决。然后在实际情况中,我们发现迁移是非常困难的,此类系统往往占用了大量的空间,精确的检查系统状态,并且在不影响用户的情况下恢复是一项非常困难的任务。在过去十年,我们尝试了多种方式来将系统从一个不健康的数据中心迁移到另一个数据中心,我们当前的结论是,使系统支持多宿主(multi-homed)是最优的方案。
过去十年,已经建立了许多的流处理系统,除了Google的Photon之外,我们还不知道哪个已经发布的系统使用geo-replication和multi-homing来提供高可用和完全一致性,即使在数据中心故障的情况下也是如此。Photon为需要追踪event级别状态的应用而设计,例如需要joinging不同的日志源。但是,对于数据传输型应用,例如聚合和格式转换,这是一个资源密集型的解决方案,在这些应用程序中,只需要跟踪event bundles级别的状态即可,也就是将多个事件作为单个处理单元。事件绑定需要的机器资源少得多,并且需要不同于Photon的设计/性能考虑以及故障语义。Ubiq对backup woker,woker allocation采用不同的机制,并且具有不同的延迟和资源利用特性。在Section 7中有对Photon和Ubiq具体的对比。
以下是Ubiq架构足够通用,足以部署在各种应用环境中需要克服的挑战:
下面是解决Ubiq面临的挑战的一些核心思路:
本文采用以下方式进行组织:第2节介绍Ubiq的总体架构和一些关键组件的实现细节;第3节介绍Ubiq设计的关键特性——在单数据中心或多数据中心下的exactly-once、fault tolerance、scalability;第4节演示如何在数据转换和聚合的应用程序中集成Ubiq;第5节总结基于Ubiq处理的生产指标和性能数据;第6节介绍在几十个Ubiq生产环境中部署学到的经验和教训;第7节介绍相关的工作;第8节对全文进行了总结。
图1说明了Ubiq在单个数据中心下的整体架构。图中的数字展示了系统中的数据流:
输入所述,Ubiq框架相对简单明了,挑战在于1.1节描述的强一致、可扩展性、可靠性和效率。
Expectations for Input Log Data:Ubiq期望多个数据中心输入的文件达到最终每个字节的一致。比如,新文件会被加入,已经存在的文件会增长。当以冗余的方式创建文件时,不同区域中相应的文件在任何时候都可能不同,但是在将来的某个时刻最终将达到相同的状态。如果一个文件在一个数据中心的大小为S1,在另一个数据中心的大小为S2,并且S1<S2,那么两个文件的前面S1个字节是完全相同的。
State Server:State Server是日志处理状态的全局复制源,并且是其他Ubiq组件之间沟通的通信中心。它由一个称为PaxosDB的同步的database服务使用Paxos在多数据中心之间执行一致性的数据复制。它存储关于哪些已经被处理,哪些还没被处理的元数据。对于每个输入文件和offset,它保存三个状态:
它通过合并处于相同状态的连续字节来有效的保存信息,即状态保存在<file, begin_offset, end_offset>的粒度上。
所有其他Ubiq的组件都与State Server交互。State Server从Log Data Tracker新数据到达的信息,使用元数据创建会通过Work Unit Retriever传递给Local Application Progressing的Work Unit,并且提交由Work Unit Committer提交已经完成的Work Unit到State Server。存储在State Server的元信息对Ubiq提供exactly-once语义非常重要。State Server禁止从Log Data Tracker接收任何重复的数据。所有的元数据操作,例如创建wokr unit,Work Unit Retriever检索work,Work Unit Committer提交work,都被State Server当做分布式事务,采用read-modify-write的方式执行在底层的存储设备上。
Log Data Tracker:Log Data Tracker的核心任务是发现输入日志中的数据增长,数据增长有两种方式:新创建的文件和已经存在的文件的内容的增长。Log Data Tracker持续扫描输入目录并且注册新文件的名称和当前的大小到State Server上。它同时也监测已经存在的文件的大小变化,当新数据增加时通知State Server。
Log Data Tracker独立运行在输入日志的数据中心或者附近,并且只通知运行在同一数据中心的State Server。一旦State Server完成对Log Data Tracker输入数据的去重,那么Log Data Tracker的设计将变得简单,只需要保证at-least-once语义。每个文件被至少一个Log Data Tracker追踪。每个update操作都被重试,知道State Server确认成功。
Work Unit Creator:Work Unit Creator作为State Server的一个后台线程运行,它的目标是将连续的增长的日志输入编程离散的work unit或者event bundles。Work Unit Creator维护文件在每个数据中心增长的最大的offset。它同时保存过去为此文件创建的work unit的offset。当创建新的work unit时,它会自动更新offset以确保输入每个输入字节正好属于一个work unit。为了防止饥饿,在创建work unit时会优先处理最旧的文件。同时Work Unit Creator会尝试确保一个work unit拥有多个不同文件的块,使得他们可以并行的被应用程序处理。
Work Unit Retriever and Work Unit Committer:这两个组件的目的是将本地应用程序的处理逻辑和Ubiq其他的组件完全隔离开。Work Unit Retriever的职责是从State Server获取未commit的work unit。它将这些work unit投递Local Application Processing组件并且通过全局系统状态追踪这些投递。一旦Local Application Processing处理完一个work unit,它请求Work Unit Committer进行一次提交。这将启动原子提交,一旦成功,全局系统状态将被更新来确保work unit中的事件不会再被重复处理。如果提交失败,将再次处理work unit以保证exactly-once语义。
Dispatcher:如果应用程序的结果是确定的,并且output storage期望at-least-once语义,Local Application Processing可以直接将结果投递给output storage system。否则由去重的Dispatcher组件负责将结果写入output storage。Dispatcher需要在State Server和output storage之间执行两阶段的commit来保证exactly-once语义。Ubiq目前支持结果到Mesa和Colossus。Ubiq拥有通用的API接口用于在未来支持更多的output storage。
Garbage Collector:一旦一个work unit被分发到output storage,一个负责垃圾回收的State Server的后台线程将回收work unit以及和它相关的元数据信息。这个线程同时负责回收文件名,一旦他们输入超过一定的天数(d days)并且他们已经全部被处理。State Server确保如果它从Log Data Tracker接收到一个文件,它的时间戳比d还老,它将会被丢弃。Log Data Tracker只会跟踪最多d天内的日志。
目前为止我们一直关注的是一个数据中心内的Ubiq的设计。Figure 2详细的展示了Ubiq在两个数据中心的架构。
Replication of critical state:在Ubiq中,需要跨数据中心保持一致性的是State Server中的global system state maintained。特别是必须在多个数据中心之间以强一致的方式维护全局状态信息,以确保不违反日志处理框架的Exactly-Once语义。这是通过使用PaxosDB完成的,如上一节的State Server所描述的。所有的元数据操作,例如Work Unit Retrievers进行work的创建,work的恢复,或者Work Unit Committers提交work,都被以跨数据中心的分布式事务的方式在State Server中执行。为了分摊单个事务的开销,我们使用了多个系统级优化,例如批量处理多个事务。
De-duplication of input from multiple logs data centers:如前所述,Ubiq希望输入的数据在多个数据中心达到最终一致。每个数据中心的Log Data Tracker独立的追踪相应输入日志的数据中心的数据增长。Work Unit Creator统一来自多个输入日志数据中心的数据来创建全局的work unit。它通过在State Server中位置一个key-value的数据结构来实现。key是文件名(不包含路径),在value中存储所有该文件在多个输入日志数据中的相关的元数据。如果输入数据只在一个输入的数据中心有效,它将会在work unit中对此进行标记,以便于work unit在一个就近的数据中心被处理。State Server在所有健康的数据中心之间统一的分配work unit,或者是根据用户给每个数据中心的Local Application Processing配置的资源进行分配。
Replication of output data:包含Local Application Processing结果的数据中心在结果被消费之前就关闭了是有可能的。为了处理这种情况,Ubiq必须能回滚已经提交的work unit以重新生成结果或者在提交work unit之前将Local Application Processing的结果复制到另一个数据中心。如果应用程序的业务逻辑是非确定性的,并且output storage已经部分处理了输出,那么回滚不是一个可选的方案。为了解决这个问题,Ubiq提供了Replicator组件。Replicator将文件从本地文件系统复制到其他数据中心的一个或者多个的数据中心。
Preventing starvation:尽管Ubiq不提供任何顺序的保证,但它确保不会出现“饥饿”。每个框架的组件都优先处理最老的work unit。例如,Work Unit Creator创建work unit时添加最老的数据;Work Unit Retriever检索最旧的work unit,以此类推。
根据应用程序的性质,输入数据的处理可以是(1)at-most-once语义、(2)at-least-once语义、(3)exactly-once语义,或者(4)极端场景下没有一致性保证。鉴于Ubiq必须是通用的才能在所有的应用程序中使用,所以它提供了exactly-once语义。支持这种一致性保证显而易见的会带来同步的开销;但是,我们的经验是大量的应用程序都需要使用exactly-once语义来处理他们的输入日志,特别是像广告计费、用户付款这样的系统。如2.3节中提到的,Ubiq通过对来自多个数据中心的输入进行去重来实现exactly-once语义,所有的输入数据的元数据操作作为Paxos的分布式事务在State Server上执行,并且确保没有饥饿的情况。
注意,对于利用Ubiq实现exactly-once保证的系统,必须保证编写的代码没有副作用。Ubiq的设计不提供任何顺序性相关的保障;它限制数据的处理是相互独立的。但是Ubiq保证输入事件不会饥饿(一定会被处理)。
以下内容是Ubiq如何在单数据中心处理机器故障:
如果数据中心是full outage模式,它是完全无法响应的。数据中心如果是partial outage模式,那么可以响应,但是性能和可用性会显著的下降。虽然full outage和partial outage都是通过将工作迁移到健康的数据中心来解决的,但他们的影响是有一些差异的。
Impact of full data center outage:Google提供专门的服务,持续监控数据中心的中断并通知感兴趣的系统;Ubiq通过这些外部的信号量感知数据中心的故障。通常状况下,Ubiq均匀的将work unit分配给所有健康的数据中心。在数据中心出现full outage的情况下,Ubiq停止给不健康的数据中心分配任何work unit。已经分配给不健康的数据中心的work unit将会被立即重新分配给健康的数据中心。一旦发生完全中断的情况,整个工作负载将有剩余的健康的数据重新处理。假如剩余的健康的数据中心能承担这些负载,那么对端到端的延迟没有影响。
Impact of partial data center outage:不像完全中断,没有直接的信号量或者监控来检测数据中心的部分中断。因此,我们需要在Ubiq内部建立处理部分中断的机制。如3.2节中提到的,ETA的概念使我们可以在同一个数据中心内拥有备份的woker。但是,在数据中心部分中断的场景下,不健康的数据中心中的备用worker可能持续在处理相同的work unit,导致部分work unit的饥饿。为了防止这种情况的发生,我们在State Server上也有ETA,作为data center的ETA。我们将一个work unit分发给一个数据单元,然后将数据单元的ETA记为T,如果在T时间内work unit没有被提交,它将被其他的数据中心处理。这保证了如果一个数据中心故障或者它无法在特定的SLA下完成工作,另一个数据中心的备用worker会自动接管这个工作。因此,当出现部分中断的情况,工作的迁移不会立即发生,而是等待data center ETA的超时。健康的数据中心只在数据中心的ETA过期之后接管超时的work unit。在实践中,data center ETA总是被设置成比local ETA大一个数量级。Data center ETA会导致延迟增加,如果不希望增加延迟,可以将data center ETA的值调小,但是代价是更多的资源消耗。
注意,并不能因为data center ETA的存在而移除local ETA。本地应用程序处理通常执行部分数据处理的中间本地检查点。 拥有local ETA允许同一数据中心的备份工作人员从这些检查点恢复处理。
这种设计的结果是整个Ubiq架构能够适应部分和全部数据中心的中断; 此外,它可以从N个数据中心动态重新配置到N0个数据中心,这使我们以7*24的方式运行日志处理流水线的操作任务明显更易于管理。
如上所述,除了State Server,所有的Ubiq组件都是无状态的,这意味着它们可以进行扩容而不引起一致性的问题。
为了确保State Server不成为扩容性的瓶颈,配置信息采用key的概念来对work在多个机器之间进行分区:输入文件的名称被转换成一个int值,通过这个值对分区数取模来完成分区。每个机器负责一个单独的分区。
为了保证State Server可扩展,我们必须在系统运行时支持动态的分区信息重配置。我们通过为不同的时间窗口保存配置信息来完成动态的分区信息变更。每个输入文件名会基于一个全局的时间工具编码一个不可变的时间戳以确保时间信息在所有regions之间都是一致的。State Server拥有一个分配给他的time range。可能是<5:00 AM Toady>到<5:00 PM Today>拥有10个分区,而<5:01 PM Today>拥有20个分区。在这个过程中,State Server根据时间戳决定使用哪个分区规则,直到他安全的转换到一个新的配置。
Ubiq的设计是可扩展的。从应用程序开发者的角度看,Ubiq将连续的分布式数据的处理转换成了本地的离散的日志记录块的处理。Ubiq的API可以被任何特定应用的代码使用,因此可以轻松的集成到各种应用的上下文中。应用程序开发者只需要提供日志处理代码以及一些配置信息,例如输入日志的文件名,分区数量,数据中心数量等。
现在我们描述在Google如何使用Ubiq框架部署关键应用。此应用的目标是不断的将日志数据转换和聚合为更高维度的数据,将结果下沉到下游的存储系统中,例如Mesa这样数据分析的系统。如第2部分描述的,Ubiq将处理职责分为(1)一个专注于增量work管理、metadata管理、work unit创建的通用框架;(2)专门的local application processing,关注处理输入事件处理的应用程序逻辑。该应用程序具有以下职责:
图3说明了上述使用Ubiq框架的应用程序。应用程序开发者只需要实现Data Transformer&Aggregator子组件。该组件拥有良好定义的API用于连接用户和Ubiq框架。上面的部署使用了Replicator组件,因为底层业务逻辑处理是非确定性的。
如图4,生产环境的Ubiq采用高度分散的方式部署。输入日志至少在两个区域的数据中心冗余可用,Data Center1和Data Center2。Ubiq Pipeline至少在三个数据中心活跃,Data Center A、Data Center B和Data Center C。为了保持数据在局部处理,Data Center A和Data Center C靠近Data Center1,Data Center B靠近Data Center2。全局的系统状态虽然显示为集中式的组件,但通常至少在5个不同的数据中心以同步的方式维护。如果Data CenterB出现部分或全部中断,Data Center A和Data Center C将开始共享工作负载,不需要任何的人工接入且不违反SLA。这里假设了Data Center A和Data Center C有足够的资源来扩容Ubiq的组件来承载额外的资源。
接下来给出一些关键指标以突出Ubiq框架的性能特性。在Google,Ubiq被部署用于数十种不同的日志类型,这实际上意味着我们有数十种不同的管道,不同的数据速率被连续处理。典型pipeline的规模大约为每秒几百万个数据输入,每秒产生几百万个数据输出。本节的数据来自于两个这样的pipeline。
Throughput and latency during normal periods:图5说明了每个数据中心Ubiq的吞吐。我们观察到负载在两个数据中心均衡的分布。图6说明了在和图5相同的时间,90%的处理输入日志的延迟。延迟的计算规则是数据从进入第一个Ubiq组件到最终输出到外部存储。基于我们内部测试,当没有应用处理逻辑时,90%的延迟都在一分钟以下。因此额外的延迟来自于应用程序对这个特定日志类型的处理。
Impact of full data center outage:图7和图8分析了当出现数据中心完全中断时的行为。图7中,一个数据中心完全中断,结果是增加了另一个数据中心的负载。但是图8中90%的延迟数据表明,延迟并没有收到work迁移的负面影响。如3.3节说明的,Ubiq获取一个外部的数据中心中断的信号量,然后立即将工作负载转移到健康的数据中心。每个数据中心都配置为处理完整的负载。 请注意,在整个数据中心停机期间,在很短的时间内出现了大幅度的延迟峰值。 这是因为输入事件的数量大幅增加。
Impact of partial data center outage:图9和10描绘了存在部分数据中心中断时系统的行为。图9中,在5:30AM左右,一个数据中心出现部分中断的情况,它的吞吐急剧下降,而另一个数据中心则增加了额外的负载。图10展示了90%的延迟:实际上,在部分中断期间,处理输入日志事件的延迟显著增加。如3.3节中所示,这是因为工作的迁移需要在数据中心的ETA超时之后。
总结而言,Ubiq框架透明的处理了部分中断和全部中断的情况,不需要人工干预。
本节中将简要介绍我们从构建生产环境中用于连续处理数据流的大型框架学到的经验教训。核心的一点是在设计大型基础设施时为异常做好准备,因为在我们的规模上,一些低概率的事情会发生并且导致严重的问题。
Data corruption:作为一个基础架构团队,我们会考虑整个系统中各个组件的软件和硬件故障。但是,由于堆栈中的软件和硬件故障低得多,因此在计算数据损坏时会遇到一个重大挑战。Ubiq运行的规模增加了在生产中看到这些错误的机会。 此外,本地应用程序处理组件内或上游系统中的业务逻辑可能存在错误。 这可能导致嵌入在Ubiq中的本地应用程序处理组件由于少数不良事件而导致整个工作单元失败。
我们已经构建了若干个解决方案用于解决数据损坏的问题。第一种方法是提供详细的报告工具,允许应用程序开发人员识别出现问题的输入工作单元的确切字节范围。此外,我们更进一步,其中具有关于损坏的字节范围的诊断的失败的工作单元自动分成多个部分:损坏的字节范围和未损坏的字节范围。未损坏的范围作为新的工作单元排队,并且将损坏的字节范围报告给Ubiq客户端以供进一步调查。 这可确保可以成功处理与原始工作单元关联的所有未损坏的字节范围。
Automated workload throttling:尽管Ubiq的设计具有高度的可扩展性,但在实践中,由于外部因素,系统内部会出现瓶颈。例如,当系统请求额外的机器资源来扩展本地应用程序处理组件时,如果配置有延迟,则Ubiq中将存在工作负载累积。如果不采取措施,这可能会对整个系统的运行状况产生负面影响,或者可能不必要地向Google的Borg系统发起多个资源配置请求,导致以后资源利用不足。 为了避免这些问题,我们在Ubiq的每个阶段都构建了监控和工作负载限制工具,以便在Ubiq发现下游组件过载时限制上游组件的工作生成。
Recovery:尽管Ubiq本身在多个数据中心复制其状态,但对于关键业务应用程序,我们通过在每个工作单元的输出存储系统中保留额外的元数据来防范整个Ubiq管道的故障。此元数据跟踪用于生成输出的输入文件名/偏移列表。 在Ubiq失败的情况下,我们可以从输出存储系统和输入日志中读取此元数据,以在状态服务器中引导Ubiq的状态。 理论上,如果运行Paxos分区的大多数计算机运行状况不佳,则可能导致State Server中的状态损坏。 实际上,更有可能导致Ubiq失败的原因是代码中的意外错误导致状态服务器中的状态不一致。
在过去十年中,已经出现了大量关于数据流连续处理的研究[1,2,7,9,10]。这些系统中的大多数是研究原型,并且重点是开发用于处理数据流上的连续查询的声明性语义。在过去几年中,管理连续数据的需求在基于互联网的应用程序和服务的背景下变得尤为重要。诸如Storm [6],Samza [5],Spark Streaming [17],Apache Flink [4]和Heron [14]等系统可在开源域中使用,以便在存储之前连续转换信息。但是,这些系统都不是multi-homed; 它们在单个数据中心运行,因此容易受到数据中心中断的影响。唯一已经发布的,即使在数据中心故障的情况下依旧提供mulit-homing和强一致保障是geo-replicated的Google的Photon [3]。Ubiq和Photon之间核心的差异是Photon的目标是event-level级处理,而Ubiq支持将多个event组合成work unit处理。Photon和Ubiq之间处理粒度的差异导致设计和性能权衡的以下差异:
在本文中,我们介绍了一个可扩展框架的设计和实现细节,以便以事件包的形式连续处理数据流。我们举例说明了Ubiq框架如何在Google的实际生产应用中使用。 Ubiq设计的一个关键方面是明确地将框架的系统级组件与应用程序处理分开。这种可扩展性允许无数的应用程序利用处理框架而无需重复工作。 事实证明,Ubiq的可扩展性是许多应用程序使用的强大范例,尽管它最初是为了简化少数非常大的客户的操作问题而设想的。Ubiq的另一个关键特性是它提供了exactly-once语义。 虽然没有排序保证,但只需exactly-once语义就可以使应用程序逻辑变得更加简单:应用程序开发人员不必使处理逻辑复杂化以处理丢失或重复的数据。为了应对输入数据速率的高度可变性,Ubiq的设计具有高度可扩展性和弹性:可以动态配置或移除其他资源,而不会影响操作系统。数据中心级别的组件故障通过以交错方式冗余处理工作单元来处理。 最后,Ubiq的多宿主设计使其有效地透明地处理全部和部分数据中心中断,无需任何人工干预。在未来,我们计划为Ubiq开发面向服务的架构,以实现更有效的访问控制,隔离和资源管理。 我们还在探索使用机器学习模型进行精细级资源管理和预测控制。
本文分享自 MessageQueue 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!