首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >分布式作业系统 Elastic-Job-Lite 源码分析——主节点选举

分布式作业系统 Elastic-Job-Lite 源码分析——主节点选举

作者头像
企鹅号小编
发布于 2018-02-02 03:30:51
发布于 2018-02-02 03:30:51
7880
举报
文章被收录于专栏:编程编程

1. 概述

本文主要分享 Elastic-Job-Lite 主节点选举。

涉及到主要类的类图如下:

.

2. 为什么需要选举主节点

首先我们来看一段官方对 Elastic-Job-Lite 的介绍:

Elastic-Job-Lite 定位为轻量级无中心化解决方案,使用 jar 包的形式提供分布式任务的协调服务。

无中心化,意味着 Elastic-Job-Lite 不存在一个中心执行一些操作,例如:分配作业分片项。Elastic-Job-Lite 选举主节点,通过主节点进行作业分片项分配。目前,必须在主节点执行的操作有:分配作业分片项,调解分布式作业不一致状态。

另外,主节点的选举是以作业为维度。例如:有一个 Elastic-Job-Lite 集群有三个作业节点A、B、C,存在两个作业 a、b

,可能 a作业的主节点是 C,b作业的主节点是A。

3. 选举主节点

调用 LeaderService#electLeader()选举主节点。

大体流程如下

.

实现代码如下:

// LeaderService.java/** * 选举主节点. */public void electLeader() { log.debug("Elect a new leader now."); jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback()); log.debug("Leader election completed."); }// JobNodeStorage.javapublic void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) { try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) { latch.start(); latch.await(); callback.execute(); } catch (final Exception ex) { handleException(ex); } }// LeaderElectionExecutionCallback.javaclass LeaderElectionExecutionCallback implements LeaderExecutionCallback { @Override public void execute() { if (!hasLeader()) { // 当前无主节点 jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); } } }

使用 Curator LeaderLatch 分布式锁,保证同一时间有且仅有一个工作节点能够调用LeaderElectionExecutionCallback#execute()方法执行主节点设置。Curator LeaderLatch 在《Elastic-Job-Lite 源码分析 —— 注册中心》「3.1 在主节点执行操作」有详细解析。

在 LeaderElectionExecutionCallback#execute()为什么要调用 #hasLeader() 呢?

LeaderLatch 只保证同一时间有且仅有一个工作节点,在获得分布式锁的工作节点结束逻辑后,第二个工作节点会开始逻辑,如果不判断当前是否有主节点,原来的主节点会被覆盖。

.

选举成功后,Zookeeper 存储作业的主节点:/$/leader/electron/instance为当前节点。该节点为临时节点

[zk: localhost:2181(CONNECTED) 7] get /elastic-job-example-lite-java/javaSimpleJob/leader/election/instance192.168.16.137@-@82496

选举主节点时机

第一种,注册作业启动信息时。

.

新的作业启动时,即能保证选举出主节点。

当该作业不存在主节点时,当前作业节点成为主节点。

当该作业存在主节点,当前作业节主节点不变。

第二种,节点数据发生变化时。

class LeaderElectionJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) { leaderService.electLeader(); } } }

符合重新选举主节点分成两种情况。

主动选举 #isActiveElection(...)

.

当作业被禁用( LiteJobConfiguration.disabled = true )时,作业是不存在主节点的。那有同学就有疑问了?LeaderService#electLeader()没做这个限制呀,作业注册作业启动信息时也进行了选举。在「4. 删除主节点」小结,我们会解开这个答案。这里大家先记住这个结论。

根据上面我们说的结论,这里就很好理解了,#isActiveElection()方法判断了两个条件:( 1 ) 不存在主节点;( 2 ) 开启作业,不再禁用,因此需要进行主节点选举落。

这里判断开启作业的方法 #isLocalServerEnabled(...)有点特殊,它不是通过作业节点是否处于开启状态,而是该数据不是将作业节点更新成关闭状态。举个例子:作业节点处于禁用状态,使用运维平台设置作业节点开启,会进行主节点选举;作业节点处于开启状态,使用运维平台设置作业节点禁用,不会进行主节点选举。

被动选举 #isPassiveElection(...)

private boolean isPassiveElection(final String path, final Type eventType) { return isLeaderCrashed(path, eventType) // 主节点 Crashed && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()); // 当前节点正在运行中(未挂掉)} private boolean isLeaderCrashed(final String path, final Type eventType) { return leaderNode.isLeaderInstancePath(path) && Type.NODE_REMOVED == eventType; }

当主节点因为各种情况( 「4. 删除主节点」会列举 )被删除,需要重新进行选举。对的,必须主节点被删除后才可以重新进行选举。

#isPassiveElection(...) 方法判断了两个条件:( 1 ) 原主节点被删除;( 2 ) 当前节点正在运行中(未挂掉),可以参加主节点选举。

#isLeaderCrashed(...)方法虽然命名带有 Crashed英文,实际主作业节点正常退出也符合被动选举条件。

等待主节点选举完成

必须在主节点执行的操作,执行之前,需要判断当前节点是否为主节点。如果主节点已经选举好,可以直接进行判断。但是,不排除主节点还没选举到,因而需要阻塞等待到主节点选举完成后才能进行判断。

实现代码如下:

// LeaderService.java/** * 判断当前节点是否是主节点.* * 如果主节点正在选举中而导致取不到主节点, 则阻塞至主节点选举完成再返回.* * @return 当前节点是否是主节点 */public boolean isLeaderUntilBlock() { // 不存在主节点 && 有可用的服务器节点 while (!hasLeader() && serverService.hasAvailableServers()) { log.info("Leader is electing, waiting for {} ms", 100); BlockUtils.waitingShortTime(); if (!JobRegistry.getInstance().isShutdown(jobName) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) { // 当前服务器节点可用 electLeader(); } } // 返回当前节点是否是主节点 return isLeader(); }

调用 BlockUtils#waitingShortTime()方法,选举不到主节点进行等待,避免不间断、无间隔的进行主节点选举。

4. 删除主节点

有主节点的选举,必然有主节点的删除,否则怎么进行重新选举。

实现代码如下:

.

删除主节点时机

第一种,主节点进程正常关闭时。

public final class JobShutdownHookPlugin extends ShutdownHookPlugin { @Override public void shutdown() { CoordinatorRegistryCenter regCenter = JobRegistry.getInstance().getRegCenter(jobName); if (null == regCenter) { return; } LeaderService leaderService = new LeaderService(regCenter, jobName); if (leaderService.isLeader()) { leaderService.removeLeader(); // 移除主节点 } new InstanceService(regCenter, jobName).removeInstance(); } }

这个比较好理解,退出进程,若该进程为主节点,需要将自己移除。

第二种,主节点进程 CRASHED 时。

$/leader/electron/instance是临时节点,主节点进程 CRASHED 后,超过最大会话时间,Zookeeper 自动进行删除,触发重新选举逻辑。

第三种,作业被禁用时。

class LeaderAbdicationJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (leaderService.isLeader() && isLocalServerDisabled(path, data)) { leaderService.removeLeader(); } } private boolean isLocalServerDisabled(final String path, final String data) { return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data); } }

这里就解答上面我们遗留的疑问。被禁用的作业注册作业启动信息时即使进行了主节点选举,也会被该监听器处理,移除该选举的主节点。

第四种,主节点进程远程关闭。

// InstanceShutdownStatusJobListener.javaclass InstanceShutdownStatusJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().getJobScheduleController(jobName).isPaused() // 作业未暂停调度 && isRemoveInstance(path, eventType) // 移除【运行实例】事件 && !isReconnectedRegistryCenter()) { // 运行实例被移除 schedulerFacade.shutdownInstance(); } } private boolean isRemoveInstance(final String path, final Type eventType) { return instanceNode.isLocalInstancePath(path) && Type.NODE_REMOVED == eventType; } private boolean isReconnectedRegistryCenter() { return instanceService.isLocalJobInstanceExisted(); } }// SchedulerFacade.java/*** 终止作业调度. */public void shutdownInstance() { if (leaderService.isLeader()) { leaderService.removeLeader(); // 移除主节点 } monitorService.close(); if (reconcileService.isRunning()) { reconcileService.stopAsync(); } JobRegistry.getInstance().shutdown(jobName); }

远程关闭作业节点有两种方式:

zkClient 发起命令:rmr /$/$/instances/$。

运维平台发起 Shutdown操作。Shutdown操作实质上就是第一种。

.

本文来自企鹅号 - java交流学习媒体

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文来自企鹅号 - java交流学习媒体

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
分布式作业 Elastic-Job-Lite 源码分析 —— 主节点选举
无中心化,意味着 Elastic-Job-Lite 不存在一个中心执行一些操作,例如:分配作业分片项。Elastic-Job-Lite 选举主节点,通过主节点进行作业分片项分配。目前,必须在主节点执行的操作有:分配作业分片项,调解分布式作业不一致状态。
芋道源码
2018/11/22
5930
源码分析ElasticJob分片机制
ElasticJob分片工作机制: 1、ElasticJob在启动时,首先会启动是否需要重新分片的监听器。 代码入口如下:
丁威
2019/06/10
1.7K0
源码分析ElasticJob分片机制
分布式作业 Elastic-Job-Lite 源码分析 —— 注册中心监听器
在《Elastic-Job-Lite 源码分析 —— 作业初始化》「3.2.4」注册作业启动信息,我们看到作业初始化时,会开启所有注册中心监听器:
芋道源码
2018/10/26
8710
分布式做系统 Elastic-Job-Lite 源码分析 —— 作业初始化
作业注册表( JobRegistry ),维护了单个 Elastic-Job-Lite 进程内作业相关信息,可以理解成其专属的 Spring IOC 容器。因此,其本身是一个单例。
芋道源码
2018/09/30
6140
分布式做系统 Elastic-Job-Lite 源码分析 —— 作业初始化
elastic-job-lite 既然去中心化,为何要选举主节点
开篇语 上一篇文章介绍了elastic-job-lite的入门,架构。使用和一些流程,里面提到elastic-job-lite是一个去中心化,轻量级的任务调度框架,那为什么elastic-jib-lite在启动时要选取主节点呢?难道我看错了,哈哈,不可能的,后文 elastic-job-lite简称ejl。 leader选举 ejl定位为轻量级,去中心化,其任务调度由各自的机器驱动,各台机器之间通过zk去协调,ejl为每个任务都创建一个JobScheduler,而在JobScheduler的初始化中回为每个
阿伟
2019/07/22
1.8K0
elastic-job-lite 既然去中心化,为何要选举主节点
源码分析ElasticJob选主实现原理
ElasticJob分布式调度服务器包含两个角色分布为主服务器、从服务器。这里的主从服务器并不是传统意义上的主备。
丁威
2019/06/10
8880
源码分析ElasticJob选主实现原理
分布式作业 Elastic-Job-Lite 源码分析 —— 注册中心
ZookeeperRegistryCenter,基于 Zookeeper 注册中心。从上面的类图可以看到,ZookeeperRegistryCenter 实现 CoordinatorRegistryCenter 接口,CoordinatorRegistryCenter 继承 RegistryCenter 接口。
芋道源码
2018/10/26
1.1K0
elastic-job选主过程
ElasticJob的主服务器的职责是根据当前存活的任务调度服务器生成分片信息,然后拉取属于该分片的任务数据执行任务。为了避免分片信息的不统一,ElasticJob必须从所有的调度服务器中选择一台为主服务器,由该台服务器统一计算分片信息,其他服务根据该分片信息进行任务调度。
leobhao
2022/06/28
5190
elastic-job选主过程
分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业失效转移
摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-failover/ 「芋道源码」欢迎转载,保留摘要,谢谢! 本文基于 Elastic-Job V2.1.5 版本分享 1. 概述 2. 作业节点崩溃监听 3. 作业失效转移 4. 获取作业分片上下文集合 5. 监听作业失效转移功能关闭 666. 彩蛋 ---- 1. 概述 本文主要分享 Elastic-Job-Lite 作业失效转移。 当作业节点执行作业异常崩溃时,其所分配的作业分片项在下次重新分片之前不会被重新
芋道源码
2018/12/13
7000
分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业失效转移
Elastic-Job2.1.5源码-基于Zookeeper分布式锁实现选举作业主节点原理
大家好,本文给大家介绍一下Elastic-Job 基于Zookeeper分布式锁实现选举作业主节点原理,本文主要介绍Elastic-Job如何使用Zookeeper分布式锁进行选举作业主节点,分布式锁的原理和注意事项。
宋小生
2022/12/14
6270
Elastic-Job2.1.5源码-基于Zookeeper分布式锁实现选举作业主节点原理
Elastic-Job2.1.5源码-作业高可用的失效转移功能实现原理动画
失效转移是作业补偿的另外一个场景,作业如果在执行过程中执行节点崩溃了那本次作业将无法正常执行完成,导致作业执行异常,这个时候就需要我们执行失效转移将崩溃的作业分片转移到其他可以正常执行的机器上面进行作业的补偿执行,失效转移的过程一共分为如下几步:
宋小生
2022/12/14
4780
分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业监控服务
本文主要分享 Elastic-Job-Lite 作业监控服务。内容对应《官方文档 —— DUMP作业运行信息》。
芋道源码
2018/12/29
6850
分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业监控服务
分布式作业系统 Elastic-Job-Lite 源码分析 —— 自诊断修复
ReconcileService 继承 Google Guava AbstractScheduledService 抽象类,实现 #scheduler()、#runOneIteration() 方法,达到周期性校验注册中心数据与真实作业状态的一致性。
芋道源码
2019/02/27
7320
分布式作业系统 Elastic-Job-Lite 源码分析 —— 自诊断修复
分布式作业 Elastic-Job-Lite 源码分析 —— 作业配置
一个作业( ElasticJob )的调度,需要配置独有的一个作业调度器( JobScheduler ),两者是 1 : 1 的关系。这点大家要注意下,当然下文看代码也会看到。
芋道源码
2018/09/30
1.4K0
分布式作业 Elastic-Job-Lite 源码分析 —— 作业配置
分布式作业 Elastic-Job-Lite 源码分析 —— 作业数据存储
JobNodePath,作业节点路径类。作业节点是在普通的节点前加上作业名称的前缀。
芋道源码
2018/10/26
4690
Elastic-Job系列一之执行器注册启动
以springboot为例看下elastic-job的执行器启动流程,启动配置类为elasticjob-lite-spring-boot-starter中的ElasticJobLiteAutoConfiguration,如下
用户9511949
2024/07/07
5200
分布式作业 Elastic-Job-Lite 源码分析 —— 作业分片
1. 概述2. 作业分片条件3. 分配作业分片项4. 获取作业分片上下文集合666. 彩----
芋道源码
2018/12/07
5730
分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业监听器
摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-listener/ 「芋道源码」欢迎转载,保留摘要,谢谢!
芋道源码
2018/12/19
8370
分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业监听器
分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业执行
Lite调度作业( LiteJob ),作业被调度后,调用 #execute() 执行作业。
芋道源码
2018/09/30
1.9K0
分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业执行
elastic-job失效转移和错过补偿
如果一个任务节点宕机后,则一次任务调度期间,一部分数据将不会被处理,为了解决由于任务节点宕机引起任务一个调度周期的一次任务执行部分数据未处理,可以设置开启故障失效转移,将本次任务转移到其他正常的节点上执行。
leobhao
2022/06/28
1.1K0
推荐阅读
相关推荐
分布式作业 Elastic-Job-Lite 源码分析 —— 主节点选举
更多 >
领券
一站式MCP教程库,解锁AI应用新玩法
涵盖代码开发、场景应用、自动测试全流程,助你从零构建专属AI助手
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档