Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spark源码之Standalone模式下master持久化引擎讲解

Spark源码之Standalone模式下master持久化引擎讲解

作者头像
Spark学习技巧
发布于 2018-01-30 09:47:36
发布于 2018-01-30 09:47:36
7080
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

Spark源码之Standalone模式下master持久化引擎讲解

Standalone 模式下Master为了保证故障恢复,会持久化一些重要的数据,来避免master故障导致集群不可用这种情况(也即单点故障)。目前,有四种持久化策略:

1,基于zookeeper的持久化引擎。

2,基于文件的持久化引擎。

3,用户自定义持久化引擎。

4,不使用持久化引擎。

在master的OnStart方法中,对应的源码如下:

代码语言:js
AI代码解释
复制
val serializer = new JavaSerializer(conf)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
 case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
 val zkFactory =
 new ZooKeeperRecoveryModeFactory(conf, serializer)
    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
 case "FILESYSTEM" =>
 val fsFactory =
 new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
 case "CUSTOM" =>
 val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
 val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
 case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_

默认,情况下是无持久化引擎,也就是没有ha策略。Spark提供的可用的ha策略:基于文件系统的和基于zookeeper。配置方法如下:

基于文件系统:

property

Meaning

spark.deploy.recoveryMode

FILESYSTEM

spark.deploy.recoveryDirectory

用来恢复状态的目录

基于zookeeper:

property

Meaning

spark.deploy.recoveryMode

ZOOKEEPER

spark.deploy.zookeeper.url

e.g., 192.168.1.100:2181,192.168.1.101:2181

spark.deploy.zookeeper.dir

zookeeper保存恢复状态的目录

生产环境中可用的是基于zookeeper的持久化引擎。

基于zookeeper持久化策略,会允许我们同时运行多个master,然后支持leader选举,最终是一个leader,其余是standby。

Spark的Master的leader选举实现

Spark源码里面使用的是CuratorFramework,跟zookeeper交流。该框架有以下特点:

1,自动连接管理:自动处理zookeeper的连接和重试存在一些潜在的问题;可以watch NodeDataChanged event和获取updateServerList;Watches可以自动被Cruator recipes删除;

2,更加简洁的API:简化raw zookeeper方法,事件等;提供现代流式API接口

3,Recipe实现:leader选举,分布式锁,path缓存,和watcher,分布式队列,Barriers等。

Spark源码里面使用了LeaderLatch实现选举功能。这个实现实际是基于zookeeper的节点类型来做,zookeeper有四种节点类型:

1,持久节点(PERSISTENT)

节点创建后,会一直存在,不会因客户端会话失效而删除;

2,持久顺序节点(PERSISTENT_SEQUENTIAL)

基本特性与持久节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;

3,临时节点(EPHEMERAL)

客户端会话失效或连接关闭后,该节点会被自动删除,且不能再临时节点下面创建子节点。

4,临时顺序节点(EPHEMERAL_SEQUENTIAL)

基本特性与临时节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;

LeaderLatch实现leader选举实际上基于临时顺序节点来做的。

Spark源码里面基于zookeeper的leader选举具体实现过程源码如下:

在master的OnStart方法里面

leaderElectionAgent = leaderElectionAgent_

实际是在构建zookeeper的持久化引擎的时候,构建的

(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))

在createLeaderElectionAgent方法里面构建了

new ZooKeeperLeaderElectionAgent(master, conf)

该对象,继承了LeaderLatchListener,并且覆盖了notLeader和isLeader两个重要的方法具体。

在ZooKeeperLeaderElectionAgent构建的时候调用了自己的start方法,该方法构建了LeaderLatch,并添加ZooKeeperLeaderElectionAgent作为其listener。

代码语言:js
AI代码解释
复制
private def start() {
  logInfo("Starting ZooKeeper LeaderElection agent")
 zk = SparkCuratorUtil.newClient(conf)
 leaderLatch = new LeaderLatch(zk, WORKING_DIR)
 leaderLatch.addListener(this)
 leaderLatch.start()
}

Leader选举在zookeeper的临时节点的路径为

val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"

执行ZooKeeperLeaderElectionAgent对象的start方法之后,每当该对象所在的master由standby变为Leader的时候,会调用isLeader()方法。由Leader变为StandBy的时候会调用notLeader()。我们就可以在这两个方法里实现自己要的状态切换的相关操作。

代码语言:js
AI代码解释
复制
override def isLeader() {
  synchronized {
 // could have lost leadership by now.
 if (!leaderLatch.hasLeadership) {
 return
 }

    logInfo("We have gained leadership")
    updateLeadershipStatus(true)
  }
}

override def notLeader() {
  synchronized {
 // could have gained leadership by now.
 if (leaderLatch.hasLeadership) {
 return
 }

    logInfo("We have lost leadership")
    updateLeadershipStatus(false)
  }
}

要实现,我们自己应用的ha,也可基于此方法。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-06-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spark 源码(3) - Master 启动之持久化引擎和选举代理
上回讲到,Master 的 main 方法中,创建了 RpcEnv 和 Master 的 Endpoint,紧接着就开始执行 Endpoint 的生命周期方法 onStart() 方法,今天就从这里开始。
kk大数据
2021/10/12
4280
Spark 源码(3) - Master 启动之持久化引擎和选举代理
深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析
第五、第六、第七篇博文,我们讲解了Standalone模式集群是如何启动的,一个App起来了后,集群是如何分配资源,Worker启动Executor的,Task来是如何执行它,执行得到的结果如何处理,以及app退出后,分配了的资源如何回收。
小爷毛毛_卓寿杰
2019/02/13
8470
深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析
基于zookeeper leader选举方式一
一,基本介绍 Curator Framework是一个针对zookeeper做的搞层次的API,极大地简化了zookeeper的使用。它基于zookeeper构建了很多功能,处理复杂的链接管理,重试操作。下面是它的一些特点: 1,自动连接管理。 A),存在的潜在错误情况,需要ZooKeeper客户端创建连接和/或重试操作。Curator 自动和透明(主要)处理这些情况。 B),监视NodeDataChanged事件,并根据需要调用updateServerList()。 C),Watches 会被Curato
Spark学习技巧
2018/01/30
1.3K0
基于zookeeper leader选举方式一
Curator实现基于zookeeper leader选举
Curator Framework是一个针对zookeeper做的搞层次的API,极大地简化了zookeeper的使用。它基于zookeeper构建了很多功能,处理复杂的链接管理,重试操作。下面是它的一些特点:
Spark学习技巧
2020/04/24
9590
Spark内核分析之Spark的HA源码分析
        Spark作业运行的集群环境有两种,分别基于standalone模式和Yarn集群模式。我们知道Yarn集群提供了HA来保证了集群的高可用,而standalone也提供了一种集群高可用的方法,即通过配置可以实现双master机制,保证在一个master挂掉以后,另外一个master立即启用。spark的主备切换提供了两种模式,一种是基于文件系统的,另外一种是基于zookeeper的。下面我们来看看spark集群的master主备切换是怎么实现的,如下图所示;
z小赵
2018/09/05
6620
Spark内核分析之Spark的HA源码分析
2021年大数据Spark(八):环境搭建集群模式 Standalone HA
Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障(SPOF)的问题。
Lansonli
2021/10/09
6350
Spark快速入门系列(6) | Spark环境搭建—standalone(3) 配置HA高可用模式
Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障的问题。
不温卜火
2020/10/28
5490
Spark快速入门系列(6) | Spark环境搭建—standalone(3) 配置HA高可用模式
Spark环境搭建——HA高可用模式
本篇博客,Alice为大家带来的是Spark的HA高可用环境搭建的教程。
大数据梦想家
2021/01/27
1.1K0
Spark环境搭建——HA高可用模式
Spark全分布部署和HA
基于Zookeeper的Standby Masters ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到ZooKeeper,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示。
编程那点事
2023/02/25
2280
Spark全分布部署和HA
聊聊curator recipes的LeaderLatch
curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java
code4it
2018/10/18
1.5K0
Spark源码分析————start-all
org.apache.spark.deploy.master.Master 让我们先来看看main()方法
俺也想起舞
2019/07/24
6540
spark总体概况
1. spark vs hadoop PS:Databricks团队特别说明,为了和Hadoop对比,这次用于排序的Spark集群没有使用它们的内存缓存机制,他们也是用硬盘存储的中间结果! http
我是攻城师
2018/05/11
7170
聊聊storm nimbus的LeaderElector
org/apache/storm/daemon/nimbus/Nimbus.java
code4it
2018/10/18
5780
Flink基于zookeeper的高可用实现源码分析
Flink中JobMaster、ResourceManager、Dispatcher、WebMonitorEndpoint提供了基于zookeeper高可用,涉及到leader选举与监听, leader选举基于zookeeper开源客户端CuratorFramework 的LeaderLatch方式实现,监听则通过NodeCache实现。基于此Flink提供了zookeeper高可用ZooKeeperHaServices, 通过该工具类可以创建LeaderElectionService与LeaderRetrievalService,包含了对应与zookeeper的znode节点分别是
Flink实战剖析
2022/04/18
6590
[spark] Standalone模式下Master、WorKer启动流程
而Standalone 作为spark自带cluster manager,需要启动Master和Worker守护进程,本文将从源码角度解析两者的启动流程。Master和Worker之间的通信使用的是基于netty的RPC,Spark的Rpc推荐看深入解析Spark中的RPC。
UFO
2018/08/29
1.7K0
Zookeeper开源客户端Curator之Master/Leader选举
本文介绍了如何使用Curator实现分布式协调,包括Leader选举、分布式锁和分布式队列。首先介绍了Leader选举的原理和常用算法,然后通过实例展示了如何使用Curator实现分布式协调。最后,总结了Curator在分布式协调中的应用场景和注意事项。
程序新视界
2018/01/08
2.3K0
Spark部署
  Spark的部署让人有点儿困惑,有些需要注意的事项,本来我已经装成功了YARN模式的,但是发现了一些问题,出现错误看日志信息,完全看不懂那个错误信息,所以才打算翻译Standalone的部署的文章
岑玉海
2018/02/28
1.2K0
Spark部署
ZooKeeper 的应用场景
ZooKeepr 提供基于类似于文件系统的目录节点树方式的数据存储,这是一个共享的内存中的树型结构。有几个概念需要关注一下。
runzhliu
2020/08/06
1.7K0
ZooKeeper 的应用场景
Spark的容错机制
摘 要 分布式系统通常在一个机器集群上运行,同时运行的几百台机器中某些出问题的概率大大增加,所以容错设计是分布式系统的一个重要能力。 容错体系概述 Spark以前的集群容错处理模型,像MapReduce,将计算转换为一个有向无环图(DAG)的任务集合,这样可以通过重复执行DAG里的一部分任务来完成容错恢复。但是由于主要的数据存储在分布式文件系统中,没有提供其他存储的概念,容错过程需要在网络上进行数据复制,从而增加了大量的消耗。所以,分布式编程中经常需要做检查点,即将某个时机的中间数据写到存储(通常是分布式
天策
2018/06/22
2.2K0
Spark之三大集群模式—详解(3)
Standalone集群使用了分布式计算中的master-slave模型, master是集群中含有master进程的节点 slave是集群中的worker节点含有Executor进程
刘浩的BigDataPath
2021/04/13
1.3K0
Spark之三大集群模式—详解(3)
相关推荐
Spark 源码(3) - Master 启动之持久化引擎和选举代理
更多 >
领券
一站式MCP教程库,解锁AI应用新玩法
涵盖代码开发、场景应用、自动测试全流程,助你从零构建专属AI助手
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档