发布
社区首页 >问答首页 >为什么在我的spark工作中有这么多任务?默认获取200个任务

为什么在我的spark工作中有这么多任务?默认获取200个任务
EN

Stack Overflow用户
提问于 2016-06-11 08:00:36
回答 2查看 17.4K关注 0票数 23

我有一个spark作业,它从hdfs中获取一个包含8条记录的文件,执行简单的聚合并将其保存回hdfs。当我这样做的时候,我注意到有成百上千的任务。

我也不确定为什么会有多个工作要做?我认为工作更像是行动发生的时候。我可以推测原因--但我的理解是,在这段代码中,它应该是一个作业,它应该被分成几个阶段,而不是多个作业。为什么不把它分成几个阶段,为什么它会分成几个工作呢?

至于200多个任务,由于数据量和节点量微不足道,当只有一个聚合和几个筛选器时,每行数据有25个任务是没有意义的。为什么它不能在每个原子操作的每个分区上只有一个任务?

以下是相关的scala代码-

代码语言:javascript
代码运行次数:0
复制
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object TestProj {object TestProj {
  def main(args: Array[String]) {

    /* set the application name in the SparkConf object */
    val appConf = new SparkConf().setAppName("Test Proj")

    /* env settings that I don't need to set in REPL*/
    val sc = new SparkContext(appConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")

     /*the below rdd will have schema defined in Record class*/
     val rddCase =  sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
      .map(x=>x.split(" "))    //file record into array of strings based spaces
      .map(x=>Record(
        x(0).toInt,
        x(1).asInstanceOf[String],
        x(2).asInstanceOf[String],
        x(3).toInt))


    /* the below dataframe groups on first letter of first name and counts it*/
    val aggDF = rddCase.toDF()
      .groupBy($"firstName".substr(1,1).alias("firstLetter"))
      .count
      .orderBy($"firstLetter")

    /* save to hdfs*/ 
 aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg")

  }

    case class Record(id: Int
      , firstName: String
      , lastName: String
      , quantity:Int)

}

下面是单击应用程序后的屏幕截图

下面是查看id为0的特定"job“时显示的阶段

下面是单击包含200多个任务的阶段时屏幕的第一部分

这是舞台内屏幕的第二部分

下面是单击"executors“选项卡之后的内容

根据请求,以下是作业ID 1的阶段

以下是作业ID 1中包含200个任务的阶段的详细信息

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-06-11 12:26:42

这是一个经典的Spark问题。

用于读取(第二图中的阶段Id为0)的两个任务是defaultMinPartitions设置,该设置被设置为2。您可以通过读取REPL sc.defaultMinPartitions中的值来获取此参数。它也应该在Spark UI中的"Environment“选项卡下可见。

你可以看看来自GitHub的code,看看这到底是怎么回事。如果您希望在读取时使用更多分区,只需将其作为参数添加,例如,sc.textFile("a.txt", 20)

现在,有趣的部分来自第二个阶段上的200个分区(第二个图中的阶段Id 1)。好吧,每次有一次混洗,Spark需要决定混洗RDD将有多少分区。正如您可以想象的,默认值是200。

您可以使用以下命令进行更改:

代码语言:javascript
代码运行次数:0
复制
sqlContext.setConf("spark.sql.shuffle.partitions", "4”)

如果您使用此配置运行代码,您将看到这200个分区将不再存在。如何设置这个参数是一门艺术。也许可以选择2倍于你拥有的内核数量(或者其他)。

我认为Spark 2.0有一种方法可以自动推断出混洗RDDs的最佳分区数量。期待这一天!

最后,您获得的作业数量与优化后的数据帧代码产生的RDD操作的数量有关。如果你读过Spark规范,它说每个RDD操作将触发一个作业。当您的操作涉及到Dataframe或SparkSQL时,Catalyst优化器将找出一个执行计划,并生成一些基于RDD的代码来执行它。很难说为什么它在你的例子中使用了两个动作。您可能需要查看优化的查询计划,以了解正在执行的操作。

票数 33
EN

Stack Overflow用户

发布于 2016-09-29 02:38:49

我也有类似的问题。但在我的场景中,我正在并行化的集合包含的元素比Spark调度的任务数量要少(导致spark有时行为异常)。使用强制分区号,我能够解决这个问题。

大概是这样的:

代码语言:javascript
代码运行次数:0
复制
collection = range(10) # In the real scenario it was a complex collection
sc.parallelize(collection).map(lambda e: e + 1) # also a more complex operation in the real scenario

然后,我在Spark日志中看到:

代码语言:javascript
代码运行次数:0
复制
INFO YarnClusterScheduler: Adding task set 0.0 with 512 tasks
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37758647

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档