贝壳实时计算平台已承接公司各种实时数据任务100多个,日处理消费总量接近250亿,采用的底层引擎包括Spark-Streaming和Flink。为了提升数据处理速度和稳定性,我们对各种引擎的处理特性进行了深入研究。在Spark-Streaming中,对流的抽象是使用DStream来定义的,所以想要理解Spark-Streaming的流处理模型,理解DStream的内部实现以及其如何构建和运行是很有必要的。
我们在定义一个流的处理逻辑时,首先从一个数据的流入源开始,这个数据源使用InputDStream定义,它是DStream的一个子类,之后我们会在其上调用一些tranform类型算子,像map,reduce,filter等等,每调用一个算子,都会创建一个新的DStream,每一个新创建的DStream都保留着当前节点所依赖的上一个节点和当前节点的执行逻辑这两个部分,这样,多个DStream节点就构成了一个逻辑执行链。
比如如下代码会生成图1的执行链:
stream.map(...).filter(...).foreachRDD(...)
虚线箭头描述的是依赖关系
最后当调用action类型的算子时,所有action类型的算子,底层都是通过ForEachDStream实现的。
我们来看ForEachDStream的源码:
private[streaming] class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean
) extends DStreamUnit {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[Unit]] = None
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
}
这里我们关注generateJob方法,这里调用了它依赖的父DStream的getOrCompute来生成一个需要它来处理的RDD,然后对该RDD做该节点本身需要做的一些操作,即foreachFunc闭包,其实所有DStream的getOrCompute方法底层都会调用compute方法,该方法会返回RDD,即所有的DStream的compute方法中,要么其本身能够从外部拉取数据,即InputDStream作为DStream链的第一个节点,要么其本身调用依赖的上游DStream的compute方法,再对生成的RDD做其本节点所定义的一些操作作为其返回值。
如此,当DStream链的最后一个节点被调用了compute方法时,它能够依次递归的调用逐节点的compute方法,最后调用第一个InputDStream节点的compute方法生成一个能够拉取外部数据的RDD。
其调用的时序图如下:
以上只是为了直观的理解DStream链是如何工作的,具体体现在分布式环境上时,是由RDD来定义操作,切分成task后由Executor来执行。
另外需要说的是如果我们在单个流上定义一系列除window外的操作,其和我们直接调用InputDStream的foreachRDD后,在rdd上定义操作是等效的。
除了上面介绍的DStream之外,在Spark-Streaming内部还有一些保存作业处理逻辑的模块和用于根据时间生成和管理每个批次数据的模块。下面是在SparkStreaming中一些比较核心的类,他们是构成一个流式作业,和使其运行起来的框架。
首先看一下这四个类交互的时序图:
图中只画了一些比较重要和核心的类和逻辑。JobGenerator每隔我们设定的时间间隔会生成一个JobGeneratorEvent事件用于触发生成一个作业。其内部是通过RecurringTimer类和EventLoop实现的。
代码如下:初始化timer和eventLoop。
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
def start(): Unit = synchronized {
if (eventLoop != null) return // generator has already been started
// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
// See SPARK-10125
checkpointWriter
eventLoop = new EventLoopJobGeneratorEvent {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
eventLoop.start()
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
这里processEvent方法用来做消息分发,根据消息的不同类型调用不同的函数进行处理。
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
startFirstTime中调用了timer和DStreamGraph的start方法,将二者内部的事件循环线程启动起来。
到这里我们知道当timer根据时间生成GenerateJobs事件时,会触发generateJobs函数的调用。
我们来看generateJobs的代码:
private def generateJobs(time: Time) {
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
其内部又调用了DStreamGraph的generateJobs方法用来生成多个Job,之后通过JobScheduler对这些Job进行提交。
DStreamGraph底层生成作业的过程是DStreamGraph实现的,它会遍历所有注册过的ForEachDStream,并分别调用他们的generateJob方法,返回一个Job对象,这就跟我们上面讲过的ForEachDStream部分关联上了。
Job里面包含了一个需要在其上执行计算的RDD,包括所有计算逻辑的闭包,而这个闭包真正执行,是在JobScheduler将这个Job对象提交到一个线程池之后,其会在线程池内执行这个Job对象内的闭包逻辑,将其转换成分布式计算的task分发到不同的节点上去执行。
JobScheduler.submitJobSet的如下代码:
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
jobExecutor就是那个线程池,其队列长度可通过spark.streaming.concurrentJobs进行配置,默认等于1。该参数决定了能够并发执行的Job数量。
JobHandler是Job的封装,会在执行Job的逻辑前后分别发布JobStarted和JobCompleted事件。而Job对象真正执行的逻辑就是在ForEachDStream类中的创建Job时foreachFunc闭包。
目前贝壳的实时计算平台上运行着143个实时任务,其中121个为Spark任务,62个为Spark-Streaming任务,59个为Structured Streaming任务。为了实现组内任务的快速开发,我们在Spark实时技术栈的基础上抽象出了Chronus框架,将流计算抽象出Source,Sink和Pipeline,其中Source和Sink可通过配置直接定义,开发时能够将精力集中在Pipeline中的业务逻辑上。
框架对监控,offset管理都做了封装。同时,我们基于Chronus框架也开发除了很多任务模板,能够让用户通过一些简单的可视化配置直接进行流式任务的开发,其中最典型的要属SQL模板:
上述的那59个Structured Streaming就是基于该SQL模板开发的实时任务。
作者介绍:
顾渊离,目前负责贝壳大数据部实时计算平台底层引擎和实时场景模板相关开发工作。
本文转载自公众号贝壳产品技术。
原文链接:
领取专属 10元无门槛券
私享最新 技术干货