Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spark 如何使用累加器Accumulator

Spark 如何使用累加器Accumulator

作者头像
smartsi
发布于 2019-08-07 06:09:29
发布于 2019-08-07 06:09:29
2.9K00
代码可运行
举报
文章被收录于专栏:SmartSiSmartSi
运行总次数:0
代码可运行

Accumulator 是 spark 提供的累加器,累加器可以用来实现计数器(如在 MapReduce 中)或者求和。Spark 本身支持数字类型的累加器,程序员可以添加对新类型的支持。

1. 内置累加器

在 Spark2.0.0 版本之前,我们可以通过调用 SparkContext.intAccumulator()SparkContext.doubleAccumulator() 来创建一个 Int 或 Double 类型的累加器:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Accumulator<Double> doubleAccumulator = sparkContext.doubleAccumulator(0.0, "Double Accumulator");
Accumulator<Integer> intAccumulator = sparkContext.intAccumulator(0, "Int Accumulator");
Accumulator<Double> doubleAccumulator2 = sparkContext.accumulator(0.0, "Double Accumulator 2");
Accumulator<Integer> intAccumulator2 = sparkContext.accumulator(0, "Int Accumulator 2");java

在 Spark2.0.0 之后的版本中,之前的的 Accumulator 已被废除,用 AccumulatorV2 代替:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@deprecated("use AccumulatorV2", "2.0.0")
class Accumulator[T] private[spark] (
    // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
    @transient private val initialValue: T,
    param: AccumulatorParam[T],
    name: Option[String] = None,
    countFailedValues: Boolean = false)
  extends Accumulable[T, T](initialValue, param, name, countFailedValues)

// Int
@deprecated("use sc().longAccumulator(String)", "2.0.0")
def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
  sc.accumulator(initialValue, name)(IntAccumulatorParam)
    .asInstanceOf[Accumulator[java.lang.Integer]]

@deprecated("use sc().longAccumulator(String)", "2.0.0")
def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
  intAccumulator(initialValue, name)

// Double
@deprecated("use sc().doubleAccumulator(String)", "2.0.0")
def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
  sc.accumulator(initialValue, name)(DoubleAccumulatorParam)
    .asInstanceOf[Accumulator[java.lang.Double]]

@deprecated("use sc().doubleAccumulator(String)", "2.0.0")
def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
  doubleAccumulator(initialValue, name)

我们可以通过调用 sparkContext.sc().longAccumulator()sparkContext.sc().doubleAccumulator() 来创建一个 Long 或 Double 类型的累加器:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
DoubleAccumulator doubleAccumulator = sparkContext.sc().doubleAccumulator("Double Accumulator");
LongAccumulator longAccumulator = sparkContext.sc().longAccumulator("Long Accumulator");

看一下这两个方法具体的实现:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * Create and register a long accumulator, which starts with 0 and accumulates inputs by `add`.
 */
def longAccumulator: LongAccumulator = {
  val acc = new LongAccumulator
  register(acc)
  acc
}

/**
 * Create and register a double accumulator, which starts with 0 and accumulates inputs by `add`.
 */
def doubleAccumulator: DoubleAccumulator = {
  val acc = new DoubleAccumulator
  register(acc)
  acc
}

通过源码我们知道分别通过创建 LongAccumulatorDoubleAccumulator 对象,然后进行注册来创建一个累加器。所以我们也可以使用如下方式创建一个Long类型的累加器:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
LongAccumulator longAccumulator = new LongAccumulator();
sparkContext.sc().register(longAccumulator, "Long Accumulator");

LongAccumulator DoubleAccumulator 都继承自 AccumulatorV2

Spark内置了数值型累加器(例如,Long,Double类型),我们还可以通过继承 AccumulatorV2 来创建我们自己类型的累加器。

2. 自定义累加器

自定义累加器类型的功能在 1.x 版本中就已经提供了,但是使用起来比较麻烦,在 Spark 2.0.0 版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2 来提供更加友好的自定义类型累加器的实现方式。官方同时给出了一个实现的示例:CollectionAccumulator,这个类允许以集合的形式收集 Spark 应用执行过程中的一些信息。例如,我们可以用这个类收集 Spark 处理数据过程中的非法数据或者引起异常的异常数据,这对我们处理异常时很有帮助。当然,由于累加器的值最终要汇聚到 Driver 端,为了避免 Driver 端的出现 OOM,需要收集的数据规模不宜过大。

实现自定义类型累加器需要继承 AccumulatorV2 并覆盖下面几个方法:

  • reset 将累加器重置为零
  • add 将另一个值添加到累加器中
  • merge 将另一个相同类型的累加器合并到该累加器中。

下面这个累加器可以用于在程序运行过程中收集一些异常或者非法数据,最终以 List[String] 的形式返回:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.sjf.open.spark;

import com.google.common.collect.Lists;
import org.apache.spark.util.AccumulatorV2;

import java.util.ArrayList;
import java.util.List;

/**
 * 自定义累加器 CollectionAccumulator
 * @author sjf0115
 * @Date Created in 下午2:11 18-6-4
 */
public class CollectionAccumulator<T> extends AccumulatorV2<T, List<T>> {

    private List<T> list = Lists.newArrayList();

    @Override
    public boolean isZero() {
        return list.isEmpty();
    }

    @Override
    public AccumulatorV2<T, List<T>> copy() {
        CollectionAccumulator<T> accumulator = new CollectionAccumulator<>();
        synchronized (accumulator) {
            accumulator.list.addAll(list);
        }
        return accumulator;
    }

    @Override
    public void reset() {
        list.clear();
    }

    @Override
    public void add(T v) {
        list.add(v);
    }

    @Override
    public void merge(AccumulatorV2<T, List<T>> other) {
        if(other instanceof CollectionAccumulator){
            list.addAll(((CollectionAccumulator) other).list);
        }
        else {
            throw new UnsupportedOperationException("Cannot merge " + this.getClass().getName() + " with " + other.getClass().getName());
        }
    }

    @Override
    public List<T> value() {
        return new ArrayList<>(list);
    }
}

下面我们在数据处理过程中收集非法坐标为例,来看一下我们自定义的累加器如何使用:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.sjf.open.spark;

import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

import java.io.Serializable;
import java.util.List;

/**
 * 自定义累加器示例
 * @author sjf0115
 * @Date Created in 下午2:11 18-6-4
 */
public class CustomAccumulatorExample implements Serializable{

    public static void main(String[] args) {
        String appName = "CustomAccumulatorExample";
        SparkConf conf = new SparkConf().setAppName(appName);
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        List<String> list = Lists.newArrayList();
        list.add("27.34832,111.32135");
        list.add("34.88478,185.17841");
        list.add("39.92378,119.50802");
        list.add("94,119.50802");

        CollectionAccumulator<String> collectionAccumulator = new CollectionAccumulator<>();
        sparkContext.sc().register(collectionAccumulator, "Illegal Coordinates");
        // 原始坐标
        JavaRDD<String> sourceRDD = sparkContext.parallelize(list);
        // 过滤非法坐标
        JavaRDD<String> resultRDD = sourceRDD.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String str) throws Exception {
                String[] coordinate = str.split(",");
                double lat = Double.parseDouble(coordinate[0]);
                double lon = Double.parseDouble(coordinate[1]);
                if(Math.abs(lat) > 90 || Math.abs(lon) > 180){
                    collectionAccumulator.add(str);
                    return true;
                }
                return false;
            }
        });
        // 输出
        resultRDD.foreach(new VoidFunction<String>() {
            @Override
            public void call(String coordinate) throws Exception {
                System.out.println("[Data]" + coordinate);
            }
        });
        // 查看异常坐标
        for (String coordinate : collectionAccumulator.value()) {
            System.out.println("[Illegal]: " + coordinate);
        }
    }

}

结果输出:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[Illegal]: 94,119.50802
[Illegal]: 34.88478,185.17841

3. 累加器注意事项

累加器不会改变 Spark 的懒加载(Lazy)的执行模型。如果在 RDD 上的某个操作中更新累加器,那么其值只会在 RDD 执行 action 计算时被更新一次。因此,在 transformation (例如, map())中更新累加器时,其值并不能保证一定被更新。

Spark 中的一系列 transformation 操作会构成一个任务链,需要通过 action 操作来触发。累加器也是一样的,也只能通过 action 触发更新,所以在 action 操作之前调用 value 方法查看其数值是没有任何变化的。对于在 action 中更新的累加器,Spark 会保证每个任务对累加器只更新一次,即使重新启动的任务也不会重新更新该值。而如果在 transformation 中更新的累加器,如果任务或作业 stage 被重新执行,那么其对累加器的更新可能会执行多次。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.sjf.open.spark;

import com.google.common.collect.Lists;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.util.CollectionAccumulator;
import org.apache.spark.util.DoubleAccumulator;
import org.apache.spark.util.LongAccumulator;

import java.io.Serializable;
import java.util.List;

/**
 * 累加器陷阱
 * @author sjf0115
 * @Date Created in 下午2:11 18-6-4
 */
public class AccumulatorTrap implements Serializable{

    public static void main(String[] args) {
        String appName = "AccumulatorTrap";
        SparkConf conf = new SparkConf().setAppName(appName);
        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        LongAccumulator evenAccumulator = sparkContext.sc().longAccumulator("Even Num Accumulator");
        LongAccumulator oddAccumulator = sparkContext.sc().longAccumulator("Odd Num Accumulator");

        /*LongAccumulator evenAccumulator = new LongAccumulator();
        LongAccumulator oddAccumulator = new LongAccumulator();

        sparkContext.sc().register(evenAccumulator, "Even Num Accumulator");
        sparkContext.sc().register(oddAccumulator, "Odd Num Accumulator");*/

        List<Integer> numList = Lists.newArrayList();
        for(int i = 0;i < 10;i++){
            numList.add(i);
        }
        JavaRDD<Integer> numRDD = sparkContext.parallelize(numList);

        // transform
        JavaRDD<Integer> resultRDD = numRDD.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer num) throws Exception {
                if (num % 2 == 0) {
                    evenAccumulator.add(1L);
                    return 0;
                } else {
                    oddAccumulator.add(1L);
                    return 1;
                }
            }
        });

        // the first action
        resultRDD.count();
        System.out.println("Odd Num Count : " + oddAccumulator.value());
        System.out.println("Even Num Count : " + evenAccumulator.value());

        // the second action
        resultRDD.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer num) throws Exception {
                System.out.println(num);
            }
        });
        System.out.println("Odd Num Count : " + oddAccumulator.value());
        System.out.println("Even Num Count : " + evenAccumulator.value());
    }
}

在第一个 action 算子 count 执行之后,累加器输出符合我们预期的结果:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Odd Num Count : 5
Even Num Count : 5

在第二个 action 算子 foreach 执行之后,累加器输出结果如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Odd Num Count : 10
Even Num Count : 10

其实这个时候又执行了一次 map 操作,所以累加器各自又增加了5,最终获得的结果变成了10。

看了上面的分析以及输出结果,我们知道,那就是使用累加器的过程中只能使用一次 action 操作才能保证结果的准确性。事实上,这种情况是可以解决的,只要将任务之间的依赖关系切断就可以。我们可以调用 cache,persist 等方法将之前的依赖切断,后续的累加器就不会受之前的 transfrom 操作影响了:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Odd Num Count : 5
Even Num Count : 5
Odd Num Count : 5
Even Num Count : 5

所以在使用累加器时,为了保证准确性,最好只使用一次 action 操作。如果需要使用多次,可以使用 cache 或 persist 操作切断依赖。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-06-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spark累加器(Accumulator)
累加器:分布式共享只写变量。(Executor和Executor之间不能读数据) 累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
用户1483438
2022/07/26
1.8K0
Spark学习笔记——共享变量
通常,当传递给Spark操作(例如map or reduce)的函数在远程集群节点上执行时,它可以在函数中使用的所有变量的单独副本上工作。这些变量被复制到每个机器,并且远程机器上的变量的更新都不会被传播回到驱动程序。在任务之间支持一般的,读写共享变量将是低效的。然而,Spark 为两种常用的使用模式提供了两种有限类型的共享变量:广播变量和累加器。
挖掘大数据
2018/01/19
1.2K0
【Spark篇】---Spark中Action算子
Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序(就是我们编写的一个应用程序)中有几个Action类算子执行,就有几个job运行。
LhWorld哥陪你聊算法
2018/09/13
1K0
【Spark篇】---Spark中Action算子
Spark学习之编程进阶——累加器与广播(5)
本文介绍了Spark编程进阶之累加器与广播变量,包括两种类型的共享变量:累加器与广播变量。累加器用于信息聚合,广播变量用于高效分发大型对象。通过示例展示了如何使用这两种变量在Spark中处理数据。
王小雷
2018/01/02
5780
Spark Core快速入门系列(12) | 变量与累加器问题
  正常情况下, 传递给 Spark 算子(比如: map, reduce 等)的函数都是在远程的集群节点上执行, 函数中用到的所有变量都是独立的拷贝.
不温卜火
2020/10/28
5520
Spark Core快速入门系列(12) |  变量与累加器问题
2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量
在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。
Lansonli
2021/10/09
5700
Spark Core项目实战 | Top10 热门品类
本实战项目的数据是采集自电商的用户行为数据. 主要包含用户的 4 种行为: 搜索, 点击, 下单和支付. 数据格式如下, 不同的字段使用下划线分割开_:
大数据真好玩
2020/09/04
1.2K0
【Spark数仓项目】需求三:地图位置解析进一步优化
因为全部调用高德API会造成高并发的问题,超出高德的调用规范,这也解释了为什么前面需求二我们只查找毕导用户。因此,在不给高德充钱的前提下,我们采用维表+高德api调用少数可以继续进行优化。
火之高兴
2024/07/25
1080
【Spark数仓项目】需求三:地图位置解析进一步优化
Spark系列(五)共享变量累加器
累加器是共享变量的一种,它提供了信息聚合的一种方法,提供了将工作节点中的值聚合到驱动器程序中的简单语法,累加器常常被作为Rdd的map,filter操作的副产品,这仍然是由于行动操作之前的转化操作仍然是惰性的,只有进行了行动操作之后,才会触发累加器的求值操作。
张凝可
2019/08/22
5750
Spark2.3.0 共享变量
通常情况下,传递给 Spark 操作(例如 map 或 reduce)的函数是在远程集群节点上执行的,函数中使用的变量,在多个节点上执行时是同一变量的多个副本。这些变量被拷贝到每台机器上,并且在远程机器上对变量的更新不会回传给驱动程序。在任务之间支持通用的,可读写的共享变量是效率是非常低的。所以,Spark 提供了两种类型的共享变量 : 广播变量(broadcast variables)和 累加器(accumulators)。
smartsi
2019/08/07
1.1K0
Spark自定义累加器的实现
Spark自定义累加器的实现 Java版本: package com.luoxuehuan.sparkproject.spark; import org.apache.spark.AccumulatorParam; /** * * @author lxh * implements AccumulatorParam<String> * String格式 进行分布式计算 * 也可以用自己的model ,但必须是可以序列化的! * 然后基于这种特殊的数据格式,可以实现自己复杂的分布式计算逻辑
Albert陈凯
2018/04/04
1.9K0
1 Spark入门各种map的操作,java语言
Spark基本操作主要就是各种map、reduce,这一篇从各种map开始。由于scala不熟悉,而且语法太精简,虽然代码量少了,但是可读性差了不少,就还是用Java来操作。
天涯泪小武
2019/01/17
7570
Spark踩坑记:共享变量
本文主要介绍了如何在Spark中通过共享变量和广播变量来提高数据处理效率和处理速度。作者通过实例介绍了共享变量和广播变量的使用方法,包括使用方式、注意事项以及示例代码。同时,作者还针对广播变量的更新难易程度提出了一种解决方案。该方案可以有效地解决广播变量更新困难的问题,在每天千万级的数据实时流统计中表现稳定。
肖力涛
2017/04/19
3.6K0
Spark踩坑记:共享变量
Apache Spark 2.2.0 中文文档 - Spark 编程指南 | ApacheCN
本文介绍了 Apache Spark 的 RDD 程序设计指南,从 RDD 的基本概念、创建与操作、缓存与存储、性能优化等方面进行了详细阐述,并提供了丰富的实例和代码以帮助读者更好地理解和掌握 RDD 的使用方法。
片刻
2018/01/05
1.7K0
Apache Spark 2.2.0 中文文档 - Spark 编程指南 | ApacheCN
【Spark篇】---Spark中广播变量和累加器
Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量。
LhWorld哥陪你聊算法
2018/09/13
1.1K0
【Spark篇】---Spark中广播变量和累加器
Spark——RDD操作详解
一、基本RDD 1、针对各个元素的转化操作 最常用的转化操作是map()和filter()。转化操作map()J接收一个函数,把这个函数用于RDD中的每一个元素,将函数的返回结果作为结果RDD中对应元素。而转化操作filter()则接收一个函数,将RDD满足该函数的元素放入新的RDD中返回。map()的返回值类型不需要和输入类型一样。 从一个RDD变成另外一个RDD。lazy,懒执行 。比如根据谓词匹配筛选数据就是一个转换操作。 例:求平均值 Scala:
天涯泪小武
2019/05/26
1.7K0
Spark Core项目实战(1) | 准备数据与计算Top10 热门品类(附完整项目代码及注释)
  本实战项目的数据是采集自电商的用户行为数据.   主要包含用户的 4 种行为: 搜索, 点击, 下单和支付.   数据格式如下, 不同的字段使用下划线分割开_:
不温卜火
2020/10/28
1.1K0
Spark Core项目实战(1) | 准备数据与计算Top10 热门品类(附完整项目代码及注释)
Accumulator
Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。
编程那点事
2023/02/25
2120
Spark之【RDD编程进阶】——累加器与广播变量的使用
上一篇博客博主已经为大家介绍了Spark中数据读取与保存,这一篇博客则带来了Spark中的编程进阶。其中就涉及到了累加器与广播变量的使用。
大数据梦想家
2021/01/27
6780
Spark之【RDD编程进阶】——累加器与广播变量的使用
spark源码系列之累加器实现机制及自定义累加器
一,基本概念 累加器是Spark的一种变量,顾名思义该变量只能增加。有以下特点: 1,累加器只能在Driver端构建及并只能是Driver读取结果,Task只能累加。 2,累加器不会改变Spark L
Spark学习技巧
2018/01/30
2.4K0
spark源码系列之累加器实现机制及自定义累加器
相关推荐
Spark累加器(Accumulator)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验