我已经通过Kafka将数据按键排序到我的Spark流分区中,也就是说,在一个节点上找到的密钥在任何其他节点上都找不到。
我希望使用redis及其incrby
(增量按)命令作为状态引擎,并减少发送给redis的请求数量,我希望通过单独对每个工作节点进行单词计数来部分减少我的数据。(关键是从word计数中获得我的功能的tag+timestamp )。我希望避免洗牌,让redis负责跨工作节点添加数据。
即使当我检查数据在工作节点之间被清晰地分割时,.reduce(_ + _)
(Scala语法)也需要很长时间(对于映射任务需要几秒对子秒),因为HashPartitioner似乎将我的数据洗牌到一个随机节点上,以便将其添加到那里。
如何才能在每个分区器上编写一个简单的字数减少,而不触发Scala中具有星火流的洗牌步骤?
注DStream对象缺少一些RDD方法,这些方法只能通过transform
方法使用。
看来我可以使用combineByKey
了。我想跳过mergeCombiners()
步骤,而将累积的元组留在原来的位置。“学习火花”一书神秘地写道:
如果我们知道我们的数据不会从中受益,我们就可以在combineByKey()中禁用地图端聚合。例如,groupByKey()禁用映射端聚合,因为聚合函数(附加到列表)不会节省任何空间。如果我们想禁用映射端组合,我们需要指定分区器;目前,您可以通过传递rdd.partitioner在源RDD上使用分区器。
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
这本书继续没有提供语法来做这件事,我也没有任何运气与谷歌到目前为止。
更糟糕的是,据我所知,在星火流中没有为DStream RDDs设置分区器,所以我不知道如何向combineByKey提供一个不会导致数据混乱的分区器。
另外,“地图端”实际上意味着什么,mapSideCombine = false
到底有什么后果?
combineByKey
的scala实现可以在https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala查找combineByKeyWithClassTag
中找到。
如果解决方案涉及自定义分区程序,请还包括一个代码示例,说明如何将该分区程序应用于传入的DStream。
发布于 2016-10-01 01:06:24
这可以使用mapPartitions
完成,它使用一个函数将一个分区上的输入RDD的迭代器映射到输出RDD上的迭代器。
为了实现单词计数,我映射到_._2
以删除Kafka键,然后使用foldLeft
执行快速迭代器单词计数,初始化mutable.hashMap
,然后将mutable.hashMap
转换为iterator以形成输出RDD。
val myDstream = messages
.mapPartitions( it =>
it.map(_._2)
.foldLeft(new mutable.HashMap[String, Int])(
(count, key) => count += (key -> (count.getOrElse(key, 0) + 1))
).toIterator
)
https://stackoverflow.com/questions/39761558
复制相似问题