首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何仅在星火流中的分区中“减少”,也许使用combineByKey?

如何仅在星火流中的分区中“减少”,也许使用combineByKey?
EN

Stack Overflow用户
提问于 2016-09-29 04:42:18
回答 1查看 3.8K关注 0票数 0

我已经通过Kafka将数据按键排序到我的Spark流分区中,也就是说,在一个节点上找到的密钥在任何其他节点上都找不到。

我希望使用redis及其incrby (增量按)命令作为状态引擎,并减少发送给redis的请求数量,我希望通过单独对每个工作节点进行单词计数来部分减少我的数据。(关键是从word计数中获得我的功能的tag+timestamp )。我希望避免洗牌,让redis负责跨工作节点添加数据。

即使当我检查数据在工作节点之间被清晰地分割时,.reduce(_ + _) (Scala语法)也需要很长时间(对于映射任务需要几秒对子秒),因为HashPartitioner似乎将我的数据洗牌到一个随机节点上,以便将其添加到那里。

如何才能在每个分区器上编写一个简单的字数减少,而不触发Scala中具有星火流的洗牌步骤?

注DStream对象缺少一些RDD方法,这些方法只能通过transform方法使用。

看来我可以使用combineByKey了。我想跳过mergeCombiners()步骤,而将累积的元组留在原来的位置。“学习火花”一书神秘地写道:

如果我们知道我们的数据不会从中受益,我们就可以在combineByKey()中禁用地图端聚合。例如,groupByKey()禁用映射端聚合,因为聚合函数(附加到列表)不会节省任何空间。如果我们想禁用映射端组合,我们需要指定分区器;目前,您可以通过传递rdd.partitioner在源RDD上使用分区器。

https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

这本书继续没有提供语法来做这件事,我也没有任何运气与谷歌到目前为止。

更糟糕的是,据我所知,在星火流中没有为DStream RDDs设置分区器,所以我不知道如何向combineByKey提供一个不会导致数据混乱的分区器。

另外,“地图端”实际上意味着什么,mapSideCombine = false到底有什么后果?

combineByKey的scala实现可以在https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala查找combineByKeyWithClassTag中找到。

如果解决方案涉及自定义分区程序,请还包括一个代码示例,说明如何将该分区程序应用于传入的DStream。

EN

回答 1

Stack Overflow用户

发布于 2016-10-01 01:06:24

这可以使用mapPartitions完成,它使用一个函数将一个分区上的输入RDD的迭代器映射到输出RDD上的迭代器。

为了实现单词计数,我映射到_._2以删除Kafka键,然后使用foldLeft执行快速迭代器单词计数,初始化mutable.hashMap,然后将mutable.hashMap转换为iterator以形成输出RDD。

代码语言:javascript
运行
复制
val myDstream = messages
  .mapPartitions( it =>
    it.map(_._2)
    .foldLeft(new mutable.HashMap[String, Int])(
      (count, key) => count += (key -> (count.getOrElse(key, 0) + 1))
    ).toIterator
  )
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39761558

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档