我有一个spark作业,它从hdfs中获取一个包含8条记录的文件,执行简单的聚合并将其保存回hdfs。当我这样做的时候,我注意到有成百上千的任务。
我也不确定为什么会有多个工作要做?我认为工作更像是行动发生的时候。我可以推测原因--但我的理解是,在这段代码中,它应该是一个作业,它应该被分成几个阶段,而不是多个作业。为什么不把它分成几个阶段,为什么它会分成几个工作呢?
至于200多个任务,由于数据量和节点量微不足道,当只有一个聚合和几个筛选器时,每行数据有25个任务是没有意义的。为什么它不能在每个原子操作的每个分区上只有一个任务?
以下是相关的scala代码-
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object TestProj {object TestProj {
def main(args: Array[String]) {
/* set the application name in the SparkConf object */
val appConf = new SparkConf().setAppName("Test Proj")
/* env settings that I don't need to set in REPL*/
val sc = new SparkContext(appConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
/*the below rdd will have schema defined in Record class*/
val rddCase = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
.map(x=>x.split(" ")) //file record into array of strings based spaces
.map(x=>Record(
x(0).toInt,
x(1).asInstanceOf[String],
x(2).asInstanceOf[String],
x(3).toInt))
/* the below dataframe groups on first letter of first name and counts it*/
val aggDF = rddCase.toDF()
.groupBy($"firstName".substr(1,1).alias("firstLetter"))
.count
.orderBy($"firstLetter")
/* save to hdfs*/
aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg")
}
case class Record(id: Int
, firstName: String
, lastName: String
, quantity:Int)
}
下面是单击应用程序后的屏幕截图
下面是查看id为0的特定"job“时显示的阶段
下面是单击包含200多个任务的阶段时屏幕的第一部分
这是舞台内屏幕的第二部分
下面是单击"executors“选项卡之后的内容
根据请求,以下是作业ID 1的阶段
以下是作业ID 1中包含200个任务的阶段的详细信息
发布于 2016-06-11 12:26:42
这是一个经典的Spark问题。
用于读取(第二图中的阶段Id为0)的两个任务是defaultMinPartitions
设置,该设置被设置为2。您可以通过读取REPL sc.defaultMinPartitions
中的值来获取此参数。它也应该在Spark UI中的"Environment“选项卡下可见。
你可以看看来自GitHub的code,看看这到底是怎么回事。如果您希望在读取时使用更多分区,只需将其作为参数添加,例如,sc.textFile("a.txt", 20)
。
现在,有趣的部分来自第二个阶段上的200个分区(第二个图中的阶段Id 1)。好吧,每次有一次混洗,Spark需要决定混洗RDD将有多少分区。正如您可以想象的,默认值是200。
您可以使用以下命令进行更改:
sqlContext.setConf("spark.sql.shuffle.partitions", "4”)
如果您使用此配置运行代码,您将看到这200个分区将不再存在。如何设置这个参数是一门艺术。也许可以选择2倍于你拥有的内核数量(或者其他)。
我认为Spark 2.0有一种方法可以自动推断出混洗RDDs的最佳分区数量。期待这一天!
最后,您获得的作业数量与优化后的数据帧代码产生的RDD操作的数量有关。如果你读过Spark规范,它说每个RDD操作将触发一个作业。当您的操作涉及到Dataframe或SparkSQL时,Catalyst优化器将找出一个执行计划,并生成一些基于RDD的代码来执行它。很难说为什么它在你的例子中使用了两个动作。您可能需要查看优化的查询计划,以了解正在执行的操作。
发布于 2016-09-29 02:38:49
我也有类似的问题。但在我的场景中,我正在并行化的集合包含的元素比Spark调度的任务数量要少(导致spark有时行为异常)。使用强制分区号,我能够解决这个问题。
大概是这样的:
collection = range(10) # In the real scenario it was a complex collection
sc.parallelize(collection).map(lambda e: e + 1) # also a more complex operation in the real scenario
然后,我在Spark日志中看到:
INFO YarnClusterScheduler: Adding task set 0.0 with 512 tasks
https://stackoverflow.com/questions/37758647
复制相似问题