累加器:分布式共享只写变量。(Executor和Executor之间不能读数据) 累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
统计列表中的元素之和
@Test
def demo: Unit ={
val conf=new SparkConf().setMaster("local[2]").setAppName("test")
val sc =new SparkContext(conf)
//定义一个集合,分区为2;方便计算
val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)
// 统计元素之和
var sum=0
// 循环累加
rdd1.foreach(e=>{
sum =sum+e
})
// 输出结果
println(s"sum=$sum")
}此时 sum 结果为多少? 答案为0
sum=0为什么是0呢?难道不应该是3+2+5+4+8+6=28吗?
原因很简单,foreach 属于Action算子;算子都是是Executor中执行的,算子外的都在是Driver中执行的。若算子中的若要引入外部变量的数据,就需要进行序列化。
具体的操作如图;
草图
虽然对sum进行累加,但只是作用于分区内而言,对于Driver而言,sum始终是没有改变的。
我们可以打印出来看看,task就是一个线程,使用Thread.currentThread().getName可以获取线程名称
// 循环累加
rdd1.foreach(e=>{
sum =sum+e
println(s"${Thread.currentThread().getName};sum=$sum, e=$e ")
})分区0
Executor task launch worker for task 0;sum=3, e=3
Executor task launch worker for task 0;sum=5, e=2
Executor task launch worker for task 0;sum=10, e=5 分区1
Executor task launch worker for task 1;sum=4, e=4
Executor task launch worker for task 1;sum=12, e=8
Executor task launch worker for task 1;sum=18, e=6 当然你可以说,我不用foreach,用其他的算子不行吗?当然可以,比如使用reduce。
@Test
def demo: Unit ={
val conf=new SparkConf().setMaster("local[2]").setAppName("test")
val sc =new SparkContext(conf)
//定义一个集合,分区为2
val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)
// 数据聚集
val sum=rdd1.reduce(_+_)
// 输出结果
println(s"sum=$sum")
}输出结果,答案是28
sum=28条条大路通罗马,实现方式多种多样。
在Spark中如果想在Task计算的时候统计某些事件的数量,使用filter/reduce也可以,但是使用累加器是一种更方便的方式,累加器一个比较经典的应用场景是用来在Spark Streaming应用中记录某些事件的数量。
使用累加器需要使用SparkContext设置
如下:sumAccumulator=累加器取个名
val sumAccumulator=sc.longAccumulator("sumAccumulator")内置累加器 内置的累加器有三种,LongAccumulator、DoubleAccumulator、CollectionAccumulator LongAccumulator: 数值型累加
LongAccumulator longAccumulator = sc.longAccumulator("long-account");DoubleAccumulator: 小数型累加
DoubleAccumulator doubleAccumulator = sc.doubleAccumulator("double-account");CollectionAccumulator:集合累加
CollectionAccumulator<Integer> collectionAccumulator = sc.collectionAccumulator("double-account");案例演示:
@Test
def demo2(): Unit ={
val conf=new SparkConf().setMaster("local[2]").setAppName("test")
val sc =new SparkContext(conf)
// 定义累加器
val sumAccumulator=sc.longAccumulator("sumAccumulator")
//定义一个集合,分区为2
val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)
// 循环累加
rdd1.foreach(e=>{
sumAccumulator.add(e)
})
// 输出结果
println(s"sum=${sumAccumulator.value}")
}结果
sum=28其他两种也就不演示了,使用起来都是一样。
add:存放数据
value:获取结果
累加器:分布式只写变量(Executor端的task不能互相访问累加器的值)。
累加器对信息进行聚合。向Spark传递函数时,通常可以使用Driver端定义的变量,但是在Executor端使用此变量时,每个task中使用的都是此变量的副本。如果变量的值发生了变化,Driver端的变量值却不会改变。
我们可以通过累加器实现分片处理,同时更新变量值
原文链接:https://blog.csdn.net/FlatTiger/article/details/115133641
可以不用,但是不能不会。
自定义累加器步骤
案例:
使用累加器实现WroldCount功能
AccumulatorV2
AccumulatorV2需要我们指定两个类型,
INT:表示输入的数据类型
OUT:表示返回结果的数据类型。abstract class AccumulatorV2[IN, OUT]不太理解没有关系,我们可以看看longAccumulator累加器中 IN 和 OUT 指定是什么?
传进去的是一个Long ,返回的也是一个Long;
class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {我们在哪里传入的呢? add 就是传进去的参数(int 可以自动转为long)
// 循环累加
rdd1.foreach(e=>{
sumAccumulator.add(e)
})我的思考方式应该是,我们应该给add传入什么类型的数据,该数据类型不就是IN吗?
既然是单词出现的个数,能否指定为String?若只是单纯的指定为String好像不太好计算。
List("python","java","python","java","spark")我们可以给每个单词分配一个值 1;
List(("python",1),("java",1),("python",1),("java",1),("spark",1))这样IN 的参数类型就明确了,首先是一个元组,元组类型为(String,Int)
那么OUT的类型呢?看下面的代码片段思考出了什么吗?
// 输出结果
println(s"sum=${sumAccumulator.value}")value 返回是不是最终的结果?WorldCount程序数据结果是什么?
是否就是这个?
List(("python",2),("java",2),("spark",1))OUT的类型,我们可以指定成一个List ,里面的元素类型,还是一个元组(String,Int)
还需要重写里面的方法。
class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
/**
* 累加器是否为空
*/
override def isZero: Boolean = ???
/**
* 复制累加器
*/
override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = ???
/**
* 重置累加器
*/
override def reset(): Unit = ???
/**
* 累加元素 [在每个task中累加]
*/
override def add(v: (String, Int)): Unit = ???
/**
* 合并每个task的累加结果【在Driver中合并】
*/
override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = ???
/**
* 获取Driver汇总结果
*/
override def value: List[(String, Int)] = ???
}先不着急写里面的实现,先调用,这样方便理解。
@Test
def demo3(): Unit ={
val conf=new SparkConf().setMaster("local[2]").setAppName("test")
val sc =new SparkContext(conf)
//初始化累加器
val acc = new CustomAccumulator
//注册累加器
sc.register(acc,"CustomAccumulator")
//读取文件
val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",2)
// 列裁剪,数据扁平化
val value: RDD[String] = lines.flatMap(_.split(" "))
// 转换成我们需要的数据结构
val mapList: RDD[(String, Int)] = value.map(e => (e, 1))
// 循环累加
mapList.foreach(e=>{
acc.add(e)
})
// 输出结果
println(s"sum=${acc.value}")
}worldCount.txt 内容
hello java shell
python java java
wahaha java shell
hello java shell shell每一个元素都会交给add,就先完成add函数
import scala.collection.mutable
// 定义一个可变map 存储add 传入进来的元素
val result=mutable.Map[String,Int]() /**
* 累加元素 [在每个task中累加]
*/
override def add(v: (String, Int)): Unit = {
// 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素
// 根据key找到map中的元素,修改原来的总数
val sum=this.result.getOrElse(v._1,0)+v._2
// 覆盖原来的key
this.result.put(v._1,sum)
}不太理解也没关系,下面有完整的代码。
value 返回的结果不就是result的结果吗?所以直接map转list。
/**
* 获取Driver汇总结果
*/
override def value: List[(String, Int)] = this.result.toList目前完成代码
class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
import scala.collection.mutable
// 定义一个可变map 存储add 传入进来的元素
val result=mutable.Map[String,Int]()
/**
* 累加器是否为空
*/
override def isZero: Boolean = ???
/**
* 复制累加器
*/
override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = ???
/**
* 重置累加器
*/
override def reset(): Unit = ???
/**
* 累加元素 [在每个task中累加]
*/
override def add(v: (String, Int)): Unit = {
// 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素
// 根据key找到map中的元素,修改原来的总数
val sum=this.result.getOrElse(v._1,0)+v._2
// 覆盖原来的key
this.result.put(v._1,sum)
}
/**
* 合并每个task的累加结果【在Driver中合并】
*/
override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = ???
/**
* 获取Driver汇总结果
*/
override def value: List[(String, Int)] = this.result.toList
}当前累加器的数据都是在result中,所以直接判断 result是否为空即可
/**
* 累加器是否为空
*/
override def isZero: Boolean = result.isEmpty复制累加器;理解起来有点抽象,new CustomAccumulator定义在Driver中,但是整个计算是在每个分区中,所以我们需要创建一个新的累加器给他(后面会有画图,理解起来就不会那么抽象了)。
/**
* 复制累加器
*/
override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = new CustomAccumulator()重置累加器 : 就是清空数据
/**
* 重置累加器
*/
override def reset(): Unit = this.result.clear()上面说了,计算都在分区中进行的,所以需要对每个分区的数据进行汇总
/**
* 合并每个task的累加结果【在Driver中合并】
*/
override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = {
// 获取其他分区的累加器数据结果
val value: List[(String, Int)] = other.value
//与result数据合并
val list: List[(String, Int)] = result.toList
// 此时 newList 中肯定有重复数据
val newList: List[(String, Int)] =list++value
// 分组,聚合
val groupList: Map[String, List[(String, Int)]] = newList.groupBy(e => e._1)
println(groupList)
// e._1 单词
// e._2 依然还是一个列表
// e._2.map(_._2).sum 获取里面的单词数
val newResult: Map[String, Int] =groupList.map(e=>{
val sum = e._2.map(_._2).sum
(e._1,sum)
})
// 合并map
result.++=(newResult)
}完整代码
class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
import scala.collection.mutable
// 定义一个可变map 存储add 传入进来的元素
val result=mutable.Map[String,Int]()
/**
* 累加器是否为空
*/
override def isZero: Boolean = result.isEmpty
/**
* 复制累加器
*/
override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] =new CustomAccumulator()
/**
* 重置累加器
*/
override def reset(): Unit = this.result.clear()
/**
* 累加元素 [在每个task中累加]
*/
override def add(v: (String, Int)): Unit = {
// 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素
// 根据key找到map中的元素,修改原来的总数
val sum=this.result.getOrElse(v._1,0)+v._2
// 覆盖原来的key
this.result.put(v._1,sum)
}
/**
* 合并每个task的累加结果【在Driver中合并】
*/
override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = {
// 获取其他分区的累加器数据结果
val value: List[(String, Int)] = other.value
//与result数据合并
val list: List[(String, Int)] = result.toList
// 此时 newList 中肯定有重复数据
val newList: List[(String, Int)] =list++value
// 分组,聚合
val groupList: Map[String, List[(String, Int)]] = newList.groupBy(e => e._1)
println(groupList)
// e._1 单词
// e._2 依然还是一个列表
// e._2.map(_._2).sum 获取里面的单词数
val newResult: Map[String, Int] =groupList.map(e=>{
val sum = e._2.map(_._2).sum
(e._1,sum)
})
// 合并map
result.++=(newResult)
}
/**
* 获取Driver汇总结果
*/
override def value: List[(String, Int)] = this.result.toList
}数据结果
sum=List((wahaha,1), (java,5), (shell,4), (hello,2), (python,1))分区二与分区一合并的数据。
Map(shell -> List((shell,2), (shell,2)), wahaha -> List((wahaha,1)), java -> List((java,1), (java,4)), python -> List((python,1)), hello -> List((hello,1), (hello,1)))本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。