DDIA 读书分享会,会逐章进行分享,结合我在工业界分布式存储和数据库的一些经验,补充一些细节。每两周左右分享一次,欢迎加入,Schedule 在这里[1]。我们有个对应的分布式&数据库讨论群,每次分享前会在群里通知。如想加入,可以加我的微信号:qtmuniao,简单自我介绍下,并注明:分布式系统群。
在第十章的时候,我们讨论了批处理——它总是读取一些文件作为输入,产生一些新文件作为输出。这里的输出就是一种“衍生数据”:即,如果有需要,我们可以通过再跑一遍批处理任务获取相同的结果集。从之前章节的讨论我们可以看出,这种思想简单却强大:像搜索引擎、推荐系统、分析系统等很多现代常见的数据系统都是基于这种思想构建的。
然而,在第十章进行讨论时我们有一个很强的假设:输入数据集是有界的——即事先知道输入尺寸——因此批处理的程序知道输入何时结束。举个例子,MapReduce 中非常重要的排序操作,就必须读入所有待排序的输入数据后才能开始排序并输出。这是因为,最后一条数据,没准可能是被需要排在最前面(具有最小的 key),因此不可能过早对数据排序。
但在现实中,很多数据都是无界的且随着时间持续到来的:我们的(各种服务的)用户昨天会产生数据、今天会产生数据,明天也将以同样的方式继续产生数据。除非你关门大吉,否则这些程序将会永无休止地工作,因此我们的数据库永远也不会到达一个“终态”(complete state)。因此,如果使用批处理的思想来处理这种持续来到的数据流,就会引出一个数据集切分的问题:例如,在一天结束时处理这一整天的数据、在每小时结束时处理这一小时的数据等等。
但上述切分+批处理的方式有个问题:太慢了,用户可能等不及。比如按天处理时,则其处理结果只有当这一天结束后,再花些时间去批处理,才能最终看到结果。为了降低这个延迟,我们确实可以用更小的粒度进行处理——比如,每秒进行一次处理。甚而,干脆抛弃时间分片的概念,任意数据到来的时候就触发数据处理逻辑。这就是流式处理(steam processing)背后的基本思想。
通常来说,一个“流”(steam)指的是随时间推移而增量产生的数据。这个概念其实很多地方都有:Unix 中标准输入输出中(stdin、stdout),编程语言中(迭代器),文件系统相关的 API 中(如 Java 的 FileInputStream
),TCP 连接中,网络中传输的音视频等等。
在本章中,我们会将事件流(event stream)当做一种数据管理机制:即将我们上一章讨论的批量数据无界化、增量化。我们首先会讨论如何表示、存储和传输数据流。在“数据库和数据流”一节中,我们会探索数据流和数据库的管理。最后,在“处理数据流”一节中,我们将会讨论对这些不间断的数据流进行处理的方法和工具,以及基于其构建应用的一些方法。
在批处理系统中,任务的输入和输出都是文件(可能是单机文件系统中的、也可能是分布式文件系统中的),那么在流式系统中,承载输入和输出的是什么呢?
在批处理系统中,虽然输入是文件,但第一步也通常是解析成一系列的数据记录(records)。在流式处理的上下中,对应数据记录的实体通常被称为事件(event)。但他们本质上都是一个东西:一段小的、自包含的(self-contained、不引用其他数据)、不可变的某个时间点发生的信息数据。流式系统中的一个事件通常会包含一个时间戳,来标志该事件在某个时钟系统(time-of-day clock)中发生的时间点。
下面举几个事件的例子。事件可以是由用户活动产生的,如浏览网页、网上购物;也可以由机器产生,如周期性的温度传感器、CPU 利用率指标;在使用Unix工具进行批处理一节的例子中,我们提到的 web 服务器中的每一行日志,也是一个事件。
我们在第四章中讨论过数据编码的事情。事件本质上也是数据,因此可以被编码为字符串、JSON 或者二进制形式。只有编码之后,事件才能被存储,如:
也只有在编码之后,事件才能够在网络中进行传输,以发送到其他工作节点进行处理。
在批处理系统中,一个文件通常是一次写多次读的。类似的,在流式处理系统中,一个事件在被生产者(producer,在不同系统中,也可以称为 publisher 或者 sender)生成之后,可能会被多个感兴趣的消费者(consumer,对应的,也可以称为 subscribers 和 recipients)处理。在文件系统中,文件名可以标识一组数据记录;在流式系统中,相关的事件通常会聚拢到主题(topic)下或者流(stream)中。换句话说,命名后的流类似于文件,但不同的是,流中的是无界数据。
原则上,使用文件或者数据库也足够用以沟通生产者和消费者:
批处理系统在以天为粒度处理数据时,正是用的这种办法。
但是,在放到低延迟的持续数据流的上下文中时,如果存储系统不是专门为此定制的,定时去拉取(polling)数据的代价会变得很高。且,在数据量一定的情况下,你拉取的频次越高,单次拉到新数据的概率就越低,则无效负载也会随之升高。因此,在流式系统中,当有新事件产生时,按需通知消费者会比频发拉取更高效(即推比拉高效)。
传统上,数据库对于这种通知机制支持的并不是很好:虽然关系型数据中的确有触发器(triggers),且可以对数据表中的一些事件(如,新插入一行)做出响应,但响应逻辑中能做的很有限(比如做一致性检查),且通常局限在数据库内部(而不能通知到客户端)。为此,一些专用的工具被开发出来以进行专门的事件通知。
通知消费者有新事件产生的一个常见方法是消息系统(messaging system):生产者将事件以消息的形式发送到消息系统,消息系统将其推送给消费者。我们在经由消息传递的数据流一节简单提过消息系统,本节我们将会讨论更多细节。
实现消息系统最简单的方式,就是使用 Unix 管道或者 TCP连接来沟通生产者和消费者。但大部分消息系统不会如此简单。比如,Unix 管道和 TCP 连接都是一对一的发送者和接受者,但成熟的消息系统通常要支持多对多的生产消费——即多个生产者可以将数据发送到一个主题( topic )下,多个消费者可以共通消费这个 topic。
但在这种发布/订阅(publish/subscribe)模式之下,不同具体的系统实现方式千差万别。没有一种方案能满足所有需求。为了理解不同系统的实现,我们可以带着两个问题去考察各个系统:
是否能够接受消息丢失取决于应用层。例如,对于一些周期性上报的传感器读数来说,偶尔的一两个采点的丢失影响不大, 因为后面的数据会很快的报上来。然而需要注意,如果消息大面积的丢失,可能也很难立即看出来。另外,如果你的目标是对所有到来的事件进行计数,则每条信息都要可靠的传输,因为任何一条信息的丢失都会导致计数错误。
我们在上一章中讨论过批处理的一个非常友好的性质——提供很好的容错保证。即,所有失败的子任务会自动的进行重试、所有失败任务的部分输出会被丢弃。这种做法会让系统看起来像没有发生过任何故障一样,从而可以让应用层大大简化编程模型(这些分布式故障如果系统不处理,就要应用层自己来处理)。在本章稍后的部分,我们会探讨如何在流式处理的上下文中提供类似的保证。
很多消息系统并不借助中间系统节点,而直接使用网络来沟通生产者和消费者双方:
这种直接消息系统在其目标场景中通常能够工作的很好,但需要应用层代码自己承担、处理消息丢失的可能性。此外,这些系统能够进行的容错很有限:虽然这些系统在检测到丢包后会进行重传,但它们通常会假设生产者和消费者都一直在线(这是一个很强的假设)。
如果消费者由于某种原因下线了,它可能会错过一些消息。有些协议会允许生产者重发失败的消息,但如果生产者也挂了,这种方法也无济于事——生产者会丢掉保存有需要进行重试的消息缓存。
这本质上是因为,这些没有 broker 的消息系统多表现为库的形式,本身是没有状态的。如果没有状态,就没有办法应对消息传输过程中生产者、消费者宕机重启的故障。这也是引入 broker 的初衷,但因此消息系统也会变的更加重。
一种广泛使用的替代方案就是使用消息代理(message broker,也称为消息队列)来发送消息。消息代理本质上是一种专门为消息数据优化过的数据库。它通常以进程的形式跑在服务器上,生产者和消费者作为客户端与之通信。生产者将消息写入消息代理,消费者从其中读取以进行消费。
通过引入一个消息数据存储代理,消息系统可以更加容易的对客户端(包括生产者和消费者)的来来去去(连接、失联和宕机)进行容错。这样,数据的持久化职责被转移到了消息代理上。有些系统中的消息代理将数据保存在内存中,那么宕机重启就仍然有问题;但另一些系统中的消息代理就会把消息持久化到硬盘(通常可配置)中,则就可以容忍宕机问题。如果遇到慢的消费者,就可以使用无限队列的方式(而不是丢消息或者背压)对没来得及消费的数据进行缓存,当然通常来说,能够存多少数据通常也会以配置的方式交给用户去选择。
使用消息代理的另外一个原因是消费者通常是异步消费的:即当发送一条消息后,生产者等待消息代理确认收到(缓存或者持久化)就会结束,而不会去等待这条消息最终被消费者所消费。而消息最终被消费者所消费,会发生在将来的某个时间点——大多数很快,比如几秒内,但如果出现大量消息积压时,这个时间也可能会很久。
有一些消息代理甚至能够参与两阶段提交(使用 XA 或者 JTA,参见 实践中的分布式事务 )。这种功能让消息代理看起来非常像数据库,尽管在实践中他们有一些非常重要的区别:
以上都是传统视角下的消息代理,这些语义被抽象成了像 JMS 和 AMQP 之类的协议,并且为 RabbitMQ、ActiveMQ、HornetQ、Qpid、TIBCO 企业消息服务、IBM MQ、Azure Service Bus 和 Google Cloud Pub/Sub 等系统实现。
当多个消费者同时消费一个 topic 下的数据时,有两种主要的消费方式,
负载均衡和扇出模式对比
两种消费模式也可以组合起来:如有两组用户都订阅了某个 topic,组间进行独立消费(fan-out)、组内进行互斥消费(load balancing)。
消费者可能会在任意时刻宕机,因此可能会出现:消息代理将消息发送给了消费者,但是消费者却没有对其进行消费或者仅进行了部分消费,就宕机了。为了保证该消息不丢,消息代理使用了一种确认机制(类似 TCP 中的 ack):每个消费者必须显式地告诉消息代理它消费完了消息,这样消息代理才能安全的将消息从队列中删除。
如果消息代理和消费者之间的链接关闭或者超时了,消息代理仍然没有收到确认,则会假设消息没有被处理,并且重新给另一个消费者发送消息。但此时有可能出现,在重发之前消息实际已经被处理过了,只是确认消息由于网络的原因丢失了。在这种情况下,需要消费者进行幂等消费。
在负载均衡模式下,重传可能会造成消费者处理消息的乱序。在下图中,在没有任何故障时,消费者大体是按照消息的生产顺序来消费的。然而,某一时刻,消费者 2 号在处理消息 m3 时宕机了,此时消费者 1 号正在处理消息 m4。由于迟迟没有等到 m3 的消费确认,消息代理将其重新发送给了消费者 1 号,从而导致消费者 1 号以 m4,m3,m5 的顺序来处理的消息。即,发生了乱序处理。
负载均衡导致的消息乱序
即使消息代理试图以顺序的方式给消费者发送消息(JMS 和 AMQP 都有此类规定),但由于负载均衡和重传机制的组合,乱序消费难以避免。为了避免这个问题,你可以让每个消费者使用单独的队列(即,不用负载均衡功能,也可以理解,毕竟并行总是有代价的)。在每条消息都是互相独立时,乱序消费不是问题;但如果消息间有前后因果依赖,则消息的保序消费非常重要。
[1]
DDIA 读书分享会: https://ddia.qtmuniao.com/
题图故事
中国美术馆拍的卖柑子的故事