流处理系统通常需要优雅地处理反压(back pressure)问题。反压通常产生是由于短时间内负载高峰导致系统接收数据的速率远高于它处理数据的速率。比如,垃圾回收停顿可能导致流入的数据快速堆积,后者双十一等造成流量陡增。反压如果不能够得到很好地处理,可能会导致资源好近甚至系统崩溃。
目前主流的流处理框架Storm、JStorm、Spark Streaming以及Flink等都提供了反压机制,各自的侧重点和实现都不相同。
对于开启了acker机制的Storm程序,可以通过设置conf.setMaxSpoutPending参数来实现反压效果,如果下游bolt处理速度跟不上导致spout发送的tuple没有及时确认的数量超过了参数设定的值,spout就会停止发送数据。
这种简单粗暴的方式主要有以下几个缺点:
Storm1.0中版本中使用了新的自动反压机制,社区解决方案如下:
反压过程:
JStorm的限流机制,当下游bolt发生阻塞的时候,并且阻塞task比例超过某个比例的时候,会触发 启动反压限流。
其中判断bolt是否发生阻塞是通过连续n次采样周其中,队列超过某个阈值,就认为该task处于阻塞状态。
根据阻塞的component,进行反向DAG推算,直到推算到源头spout,将topology的一个状态位设置为 “限流状态”。
task出现阻塞时,将自己的执行线程时间传递给TM(topology master),当启动反向限流后,TM把这个执行时间传递给spout。这样spout每次发送一个tuple,就会等待这个执行时间。而当spout降速之后,发送过阻塞命令的task检查队列水位是否连续n次低于某个阈值,如果是,就会发送解除限流命令给TM,TM然后发送提速命令给所有的spout,这样spout每次发送一个tuple就会减少等待时间,当spout的等待时间降为0,spout就会不断地向TM发送解除限速给TM,当所有降速的spout都发了解除限速命令,那么就会将topology的状态设置为正常,标志真正解除限速。
配置示例如下所示:
## 反压总开关
topology.backpressure.enable: true
## 高水位 -- 当队列使用量超过这个值时,认为阻塞
topology.backpressure.water.mark.high: 0.8
## 低水位 -- 当队列使用量低于这个量时, 认为可以解除阻塞
topology.backpressure.water.mark.low: 0.05
## 阻塞比例 -- 当阻塞task数/这个component并发 的比例高于这值时,触发反压
topology.backpressure.coordinator.trigger.ratio: 0.1
## 反压采样周期, 单位ms
topology.backpressure.check.interval: 1000
## 采样次数和采样比例, 即在连续4次采样中, 超过(不包含)(4 * 0.75)次阻塞才能认为真正阻塞, 超过(不包含)(4 * 0.75)次解除阻塞才能认为是真正解除阻塞
topology.backpressure.trigger.sample.rate: 0.75
topology.backpressure.trigger.sample.number: 4
Spark Streaming程序中当计算过程中出现batch processing time 大于 batch interval的情况时,(其中batch processing time为实际计算一个批次花费时间,batch interval为Streaming应用设置的批处理间隔),意味着处理数据的速度小于接收数据的速度,如果这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟),可以通过设置参数spark.streaming.receiver.maxRate来限制Receiver的数据接收速率,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming 从v1.5开始引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力。
Spark Streaming反压过程主要是根据JobSchedule反馈作业的执行信息来估算当前的最大处理速度(rate),然后动态地调整Receiver数据接收率。
反压执行过程主要分为两部分:BatchCompleted事件触发 以及 BatchCompleted事件处理
BatchCompleted事件触发:
BatchCompleted事件处理:
以上两个过程便将反压机制中最重要的rate调整完成。
当Receiver开始接收数据的时候,需要获取令牌才能够将数据存放入currentBuffer,否则的话将被阻塞,进而阻塞Receiver从数据源拉取数据。其中令牌投放采用令牌桶机制(参考下图),固定大小的令牌桶根据rate源源不断地产生令牌,如果令牌不消耗,或消耗的速度小于产生的速度,令牌就会不断的增多,直到把桶撑满。后面再产生的令牌就会被丢弃。
Flink 在运行时主要由 operators 和 streams 两大组件构成。每个 operator 会消费中间态的流,并在流上进行转换,然后生成新的流。在 Flink 中,这些逻辑流就好比是分布式阻塞队列,而队列容量是通过缓冲池(LocalBufferPool)来实现的。每个被生产和被消费的流都会被分配一个缓冲池。缓冲池管理着一组缓冲(Buffer),缓冲在被消费后可以被回收循环利用。
上图展示的是两个task之间的数据传输:
数据传输有两个场景:
本地传输:如果Task 1和Task 2在同一个worker即诶但,buffer可以直接传递给下一个Task。Task 2消费了该buffer,那么就会被LocalBufferPool1回收。如果Task 2消费的速度比Task 1取buffer的速度,导师LocalBufferPool1无可用的buffer,Task1等待在可用的buffer上。最终导致Task1的降速。
网络传输:如果 Task 1 和 Task 2 运行在不同的 worker 节点上,那么 buffer 会在发送到网络(TCP Channel)后被回收。在接收端,会从 LocalBufferPool 中申请 buffer,然后拷贝网络中的数据到 buffer 中。如果没有可用的 buffer,会停止从 TCP 连接中读取数据。在输出端,通过 Netty 的水位值机制来保证不往网络中写入太多数据。如果网络中的数据(Netty输出缓冲中的字节数)超过了高水位值,我们会等到其降到低水位值以下才继续写入数据。这保证了网络中不会有太多的数据。如果接收端停止消费网络中的数据(由于接收端缓冲池没有可用 buffer),网络中的缓冲数据就会堆积,那么发送端也会暂停发送。另外,这会使得发送端的缓冲池得不到回收,writer 阻塞在向 LocalBufferPool 请求 buffer,阻塞了 writer 往 ResultSubPartition 写数据。
通过固定大小的缓冲池,保证了Flink有一套健壮的反压机制,使得Task生产数据的速度不会快于消费的速度。
http://blog.csdn.net/cm_chenmin/article/details/52936575
https://github.com/alibaba/jstorm/wiki/Backpressure
http://www.jianshu.com/p/906db0d86653
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有