首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >[spark streaming] 动态生成 Job 并提交执行

[spark streaming] 动态生成 Job 并提交执行

作者头像
UFO
发布于 2018-08-29 09:38:56
发布于 2018-08-29 09:38:56
1.3K00
代码可运行
举报
文章被收录于专栏:Spark生态圈Spark生态圈
运行总次数:0
代码可运行

前言

Spark Streaming Job的生成是通过JobGenerator每隔 batchDuration 长时间动态生成的,每个batch 对应提交一个JobSet,因为针对一个batch可能有多个输出操作。

概述流程:

  • 定时器定时向 eventLoop 发送生成job的请求
  • 通过receiverTracker 为当前batch分配block
  • 为当前batch生成对应的 Jobs
  • 将Jobs封装成JobSet 提交执行

入口

在 JobGenerator 初始化的时候就创建了一个定时器:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

每隔 batchDuration 就会向 eventLoop 发送 GenerateJobs(new Time(longTime))消息,eventLoop的事件处理方法中会调用generateJobs(time)方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
      case GenerateJobs(time) => generateJobs(time)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

为当前batchTime分配Block

首先调用receiverTracker.allocateBlocksToBatch(time)方法为当前batchTime分配对应的Block,最终会调用receiverTracker的Block管理者receivedBlockTrackerallocateBlocksToBatch方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
      }
    } else {
      logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
    }
  }
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
    streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
  }

可以看到是从streamIdToUnallocatedBlockQueues中获取到所有streamId对应的未分配的blocks,该队列的信息是supervisor 存储好Block后向receiverTracker上报的Block信息,详情可见 ReceiverTracker 数据产生与存储

获取到所有streamId对应的未分配的blockInfos后,将其放入了timeToAllocatedBlocks:Map[Time, AllocatedBlocks]中,后面生成RDD的时候会用到。

为当前batchTime生成Jobs

调用DStreamGraphgenerateJobs方法为当前batchTime生成job:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

一个outputStream就对应一个job,遍历所有的outputStreams,为其生成job:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# ForEachDStream
override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

先获取到time对应的RDD,然后将其作为参数再调用foreachFunc方法,foreachFunc方法是通过构造器传过来的,我们来看看print()输出的情况:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

这里的构造的foreachFunc方法就是最终和rdd一起提交job的执行方法,也即对rdd调用take()后并打印,真正触发action操作的是在这个func函数里,现在再来看看是怎么拿到rdd的,每个DStream都有一个generatedRDDs:Map[Time, RDD[T]]变量,来保存time对应的RDD,若获取不到则会通过compute()方法来计算,对于需要在executor上启动Receiver来接收数据的ReceiverInputDStream来说:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

会通过receiverTracker来获取该batch对应的blocks,前面已经分析过为所有streamId分配了对应的未分配的block,并且放在了timeToAllocatedBlocks:Map[Time, AllocatedBlocks]中,这里底层就是从这个timeToAllocatedBlocks获取到的blocksInfo,然后调用了createBlockRDD(validTime, blockInfos)通过blockId创建了RDD。

最后,将通过此RDD和foreachFun构建jobFunc,并创建Job返回。

封装jobs成JobSet并提交执行

每个outputStream对应一个Job,最终就会生成一个jobs,为这个jobs创建JobSet,并通过jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))来提交这个JobSet:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))

然后通过jobExecutor来执行,jobExecutor是一个线程池,并行度默认为1,可通过spark.streaming.concurrentJobs配置,即同时可执行几个批次的数据。

处理类JobHandler中调用的是Job.run(),执行的是前面构建的 jobFunc 方法。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017.12.04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
揭开Spark Streaming神秘面纱③ - 动态生成 job
JobScheduler有两个重要成员,一是上文介绍的 ReceiverTracker,负责分发 receivers 及源源不断地接收数据;二是本文将要介绍的 JobGenerator,负责定时的生成 jobs 并 checkpoint。
codingforfun
2018/08/24
3750
揭开Spark Streaming神秘面纱③ - 动态生成 job
为啥spark 的broadcast要用单例模式
很多用Spark Streaming 的朋友应该使用过broadcast,大多数情况下广播变量都是以单例模式声明的有没有粉丝想过为什么?浪尖在这里帮大家分析一下,有以下几个原因:
Spark学习技巧
2019/06/18
1.1K0
为啥spark 的broadcast要用单例模式
Spark Streaming 数据清理机制
为啥要了解机制呢?这就好比JVM的垃圾回收,虽然JVM的垃圾回收已经巨牛了,但是依然会遇到很多和它相关的case导致系统运行不正常。
用户2936994
2018/08/27
1.2K0
Spark源码系列(八)Spark Streaming实例分析
这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照《Spark Streaming编程指南》。 Example代码分析 val ssc = new StreamingContext(sparkConf, Seconds(1)); // 获得一个DStream负责连接 监听端口:地址 val lines = ssc.socketTextStream(serverIP, serverPort); // 对每一行数据执行Split操作 val words = lines.flatM
岑玉海
2018/02/28
8500
Spark Streaming性能优化: 如何在生产环境下动态应对流数据峰值
默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > batch interval的情况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔。这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置间隔内不能完全处理当前接收速率接收的数据。如果这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟)。Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate ”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming 从v1.5开始引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力。
王知无-import_bigdata
2020/11/24
8540
Spark Streaming性能优化: 如何在生产环境下动态应对流数据峰值
【容错篇】WAL在Spark Streaming中的应用【容错篇】WAL在Spark Streaming中的应用
WAL 即 write ahead log(预写日志),是在 1.2 版本中就添加的特性。作用就是,将数据通过日志的方式写到可靠的存储,比如 HDFS、s3,在 driver 或 worker failure 时可以从在可靠存储上的日志文件恢复数据。WAL 在 driver 端和 executor 端都有应用。我们分别来介绍。
codingforfun
2018/08/24
1.3K0
【容错篇】WAL在Spark Streaming中的应用【容错篇】WAL在Spark Streaming中的应用
spark-streaming的checkpoint机制源码分析
转发请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/7994357.html
sanmutongzi
2020/03/04
8040
Spark Streaming 误用.transform(func)函数导致的问题解析
特定情况你会发现UI 的Storage标签上有很多新的Cache RDD,然后你以为是Cache RDD 不被释放,但是通过Spark Streaming 数据清理机制分析我们可以排除这个问题。
用户2936994
2018/08/27
4670
[spark streaming] ReceiverTracker 数据产生与存储
在Spark Streaming里,总体负责任务的动态调度是JobScheduler,而JobScheduler有两个很重要的成员:JobGenerator 和 ReceiverTracker。JobGenerator 负责将每个 batch 生成具体的 RDD DAG ,而ReceiverTracker负责数据的来源。
UFO
2018/08/29
6510
Spark源码解析:DStream
0x00 前言 本篇是Spark源码解析的第二篇,主要通过源码分析Spark Streaming设计中最重要的一个概念——DStream。 本篇主要来分析Spark Streaming中的Dstream,重要性不必多讲,明白了Spark这个几个数据结构,容易对Spark有一个整体的把握。 和RDD那篇文章类似,虽说是分析Dstream,但是整篇文章会围绕着一个具体的例子来展开。算是对Spark Streaming源码的一个概览。 文章结构 Spark Streaming的一些概念,主要和Dstream
木东居士
2018/05/25
9290
[spark streaming] DStream 和 DStreamGraph 解析
Spark Streaming 是基于Spark Core将流式计算分解成一系列的小批处理任务来执行。
UFO
2018/08/29
7360
揭开Spark Streaming神秘面纱④ - job 的提交与执行
前文揭开Spark Streaming神秘面纱③ - 动态生成 job 我们分析了 JobScheduler 是如何动态为每个 batch生成 jobs,本文将说明这些生成的 jobs 是如何被提交的。
codingforfun
2018/08/24
4590
Spark Streaming | Spark,从入门到精通
欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:)
美图数据技术团队
2018/09/18
1.1K0
Spark Streaming | Spark,从入门到精通
揭开Spark Streaming神秘面纱⑥ - Spark Streaming结合 Kafka 两种不同的数据接收方式比较
DirectKafkaInputDStream 只在 driver 端接收数据,所以继承了 InputDStream,是没有 receivers 的
codingforfun
2018/08/24
8300
揭开Spark Streaming神秘面纱⑥ - Spark Streaming结合 Kafka 两种不同的数据接收方式比较
揭开Spark Streaming神秘面纱② - ReceiverTracker 与数据导入
只需在 driver 端接收数据的 input stream 一般比较简单且在生产环境中使用的比较少,本文不作分析,只分析继承了 ReceiverInputDStream 的 input stream 是如何导入数据的。
codingforfun
2018/08/24
2800
揭开Spark Streaming神秘面纱② - ReceiverTracker 与数据导入
【容错篇】Spark Streaming的还原药水——Checkpoint
一个 Streaming Application 往往需要7*24不间断的跑,所以需要有抵御意外的能力(比如机器或者系统挂掉,JVM crash等)。为了让这成为可能,Spark Streaming需要 checkpoint 足够多信息至一个具有容错设计的存储系统才能让 Application 从失败中恢复。Spark Streaming 会 checkpoint 两种类型的数据。
codingforfun
2018/08/24
5920
【容错篇】Spark Streaming的还原药水——Checkpoint
SparkStreaming源码阅读思路
SparkStreaming的DirectAPI源码阅读思路 Spark Streaming的流式处理,尤其和kafka的集合,应该是企业应用的关键技术点,作为spark学习和工作者,要熟练的掌握其中原理,精读源码,才能更好的完成任务和相关工调优工作内容。对其原理简介,浪尖不啰嗦,请看前面的文章《聊聊流式批处理》。在这里浪尖主要介绍,Spark Streaming源码阅读时的注意事项及关注点,只有牢牢把握这几点,才能更好的使用Spark Streaming。 阅读源码谨记的点 对于SparkStreamin
Spark学习技巧
2018/06/22
5670
[spark streaming] 状态管理 updateStateByKey&mapWithState
SparkStreaming 7*24 小时不间断的运行,有时需要管理一些状态,比如wordCount,每个batch的数据不是独立的而是需要累加的,这时就需要sparkStreaming来维护一些状态,目前有两种方案updateStateByKey&mapWithState,mapWithState是spark1.6新加入的保存状态的方案,官方声称有10倍性能提升。
UFO
2018/08/29
1.2K0
sparkstreaming的状态计算-updateStateByKey源码
转发请注明原创地址:https://www.cnblogs.com/dongxiao-yang/p/11358781.html
sanmutongzi
2020/03/04
4320
Spark ListenerBus 和 MetricsSystem 体系分析
监控是一个大系统完成后最重要的一部分。Spark整个系统运行情况是由ListenerBus以及MetricsSystem 来完成的。这篇文章重点分析他们之间的工作机制以及如何通过这两个系统完成更多的指标收集。
用户2936994
2018/08/27
7540
推荐阅读
相关推荐
揭开Spark Streaming神秘面纱③ - 动态生成 job
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验