1. backpressure-背压
backpressure后面一律叫做背压。消费者消费的速度低于生产者生产的速度,为了使应用正常,消费者会反馈给生产者来调节生产者生产的速度,以使得消费者需要多少,生产者生产多少。
2. Spark Streaming的backpressure
阅读本文前,需要掌握:
1. PID控制器
StreamingListener
Spark Streaming 跟kafka结合是存在背压机制的。目标是根据当前job的处理情况,来调节后续批次的获取kafka消息的条数。 为了达到这个目的Spark Streaming在原有的架构上加入了一个RateController,利用的算法是PID,需要的反馈数据是任务处理的结束时间,调度时间,处理时间,消息条数,这些数据是通过StreamingListener体系获得,然后通过PIDRateEsimator的compute计算得到一个速率,进而可以计算得到一个offset,然后跟你的限速设置最大消费条数做比较得到一个最终要消费的消息最大offset。 当然也要限制计算的最大offset小于kafka分区的最大offset。
3. 背压源码赏析
背压源码赏析我们采用的源码是Spark Streaming 与 kafka 0.10版本的结合。
3.1 代码入口
入口当然还是,在其第一行,计算当前要生成的KafkaRDD最大截止offset。
3.2 获取kafka 分区最大offset
求kafka 分区最大offset过程是在latestOffsets()方法中。该方法有两个目的:
获取新增分区,并使其在本次生效。
获取所有分区在kafka中的最大offset。
3.3 限制不超kafka 分区的最大offset-clamp
由于,该过程是个嵌套的过程所以,我们在这里是从外层往内层穿透。
限制不超kafka 分区的最大offset实现方法是clamp,需要的参数是通过获取的kafka所有分区的最大offset。具体实现代码如下:
这里面主要是将通过maxMessagesPerPartition计算出来的offset和kafka 分区最大offset做比较,限制当前要消费的最大offset不超过kafka可提供的offset。
3.4 计算最大offset-maxMessagesPerPartition
计算每个分区的要消费的最大offset总共有以下几个步骤:
1. 计算最大 rate msg/s
第一种情况,PID计算器计算的Rate值存在
第二种情况,PID计算器计算的Rate值不存在
2. 计算得到最大offset
该值要区别于kafka 分区的最大offset,这个最大offset是通过计算出来的rate,乘以batch time得到的,可能会超过kafka 分区内最大的offset。
3.5 PID算法计算出 rate的过程
rate 是针对所有分区的,不是每个分区一个rate。
rate 计算是通过pid算法,需要通过StreamingListener体系获取以下四个数据指标:
处理结束时间 batchCompleted.batchInfo.processingEndTime
处理时间 batchCompleted.batchInfo.processingDelay
调度延迟 batchCompleted.batchInfo.schedulingDelay
消息条数 elements.get(streamUID).map(_.numRecords)
针对pid算法的计算肯定简单来说主要由以下几个步骤:
计算偏差
计算积分
积分的思路,调度延迟*处理速率得到总的未处理消息数,再除以批处理时间得到的值,可以视为就是历史累计速率,也即是积分部分。
这里假设处理速率变化不大。
计算微分
计算新的速率
更新参数
具体计算过程在PIDRateEstimator的compute方法。
PID控制器参数的配置
3.6 数据获取
pid计算器是需要数据采样输入的,这里获取的数据的方式是StreamingListener体系。
DirectKafkaInputDStream里有个叫做DirectKafkaRateController类。
每个批次执行结束之后,会调用RateController的
在computeAndPublish方法中,调用了我们的PIDRateEstimator.compute
StreamingListener加入StreamingListenerBus是在JobScheduler类的start方法中
领取专属 10元无门槛券
私享最新 技术干货